diff --git a/data/templates/firewall/nftables-defines.j2 b/data/templates/firewall/nftables-defines.j2 index a20c399ae..8a75ab2d6 100644 --- a/data/templates/firewall/nftables-defines.j2 +++ b/data/templates/firewall/nftables-defines.j2 @@ -1,102 +1,123 @@ {% macro groups(group, is_ipv6, is_l3) %} {% if group is vyos_defined %} {% set ip_type = 'ipv6_addr' if is_ipv6 else 'ipv4_addr' %} {% if group.address_group is vyos_defined and not is_ipv6 and is_l3 %} {% for group_name, group_conf in group.address_group.items() %} {% set includes = group_conf.include if group_conf.include is vyos_defined else [] %} set A_{{ group_name }} { type {{ ip_type }} flags interval auto-merge {% if group_conf.address is vyos_defined or includes %} elements = { {{ group_conf.address | nft_nested_group(includes, group.address_group, 'address') | join(",") }} } {% endif %} } {% endfor %} {% endif %} {% if group.ipv6_address_group is vyos_defined and is_ipv6 and is_l3 %} {% for group_name, group_conf in group.ipv6_address_group.items() %} {% set includes = group_conf.include if group_conf.include is vyos_defined else [] %} set A6_{{ group_name }} { type {{ ip_type }} flags interval auto-merge {% if group_conf.address is vyos_defined or includes %} elements = { {{ group_conf.address | nft_nested_group(includes, group.ipv6_address_group, 'address') | join(",") }} } {% endif %} } {% endfor %} {% endif %} {% if group.domain_group is vyos_defined and is_l3 %} {% for name, name_config in group.domain_group.items() %} set D_{{ name }} { type {{ ip_type }} flags interval } {% endfor %} {% endif %} {% if group.mac_group is vyos_defined %} {% for group_name, group_conf in group.mac_group.items() %} {% set includes = group_conf.include if group_conf.include is vyos_defined else [] %} set M_{{ group_name }} { type ether_addr {% if group_conf.mac_address is vyos_defined or includes %} elements = { {{ group_conf.mac_address | nft_nested_group(includes, group.mac_group, 'mac_address') | join(",") }} } {% endif %} } {% endfor %} {% endif %} {% if group.network_group is vyos_defined and not is_ipv6 and is_l3 %} {% for group_name, group_conf in group.network_group.items() %} {% set includes = group_conf.include if group_conf.include is vyos_defined else [] %} set N_{{ group_name }} { type {{ ip_type }} flags interval auto-merge {% if group_conf.network is vyos_defined or includes %} elements = { {{ group_conf.network | nft_nested_group(includes, group.network_group, 'network') | join(",") }} } {% endif %} } {% endfor %} {% endif %} {% if group.ipv6_network_group is vyos_defined and is_ipv6 and is_l3 %} {% for group_name, group_conf in group.ipv6_network_group.items() %} {% set includes = group_conf.include if group_conf.include is vyos_defined else [] %} set N6_{{ group_name }} { type {{ ip_type }} flags interval auto-merge {% if group_conf.network is vyos_defined or includes %} elements = { {{ group_conf.network | nft_nested_group(includes, group.ipv6_network_group, 'network') | join(",") }} } {% endif %} } {% endfor %} {% endif %} {% if group.port_group is vyos_defined and is_l3 %} {% for group_name, group_conf in group.port_group.items() %} {% set includes = group_conf.include if group_conf.include is vyos_defined else [] %} set P_{{ group_name }} { type inet_service flags interval auto-merge {% if group_conf.port is vyos_defined or includes %} elements = { {{ group_conf.port | nft_nested_group(includes, group.port_group, 'port') | join(",") }} } {% endif %} } {% endfor %} {% endif %} {% if group.interface_group is vyos_defined %} {% for group_name, group_conf in group.interface_group.items() %} {% set includes = group_conf.include if group_conf.include is vyos_defined else [] %} set I_{{ group_name }} { type ifname flags interval auto-merge {% if group_conf.interface is vyos_defined or includes %} elements = { {{ group_conf.interface | nft_nested_group(includes, group.interface_group, 'interface') | join(",") }} } {% endif %} } {% endfor %} {% endif %} + +{% if group.dynamic_group is vyos_defined %} +{% if group.dynamic_group.address_group is vyos_defined and not is_ipv6 and is_l3 %} +{% for group_name, group_conf in group.dynamic_group.address_group.items() %} + set DA_{{ group_name }} { + type {{ ip_type }} + flags dynamic, timeout + } +{% endfor %} +{% endif %} + +{% if group.dynamic_group.ipv6_address_group is vyos_defined and is_ipv6 and is_l3 %} +{% for group_name, group_conf in group.dynamic_group.ipv6_address_group.items() %} + set DA6_{{ group_name }} { + type {{ ip_type }} + flags dynamic, timeout + } +{% endfor %} +{% endif %} +{% endif %} + {% endif %} {% endmacro %} diff --git a/interface-definitions/firewall.xml.in b/interface-definitions/firewall.xml.in index a4023058f..662ba24ab 100644 --- a/interface-definitions/firewall.xml.in +++ b/interface-definitions/firewall.xml.in @@ -1,502 +1,531 @@ <?xml version="1.0"?> <interfaceDefinition> <node name="firewall" owner="${vyos_conf_scripts_dir}/firewall.py"> <properties> <priority>199</priority> <help>Firewall</help> </properties> <children> #include <include/firewall/global-options.xml.i> <tagNode name="flowtable"> <properties> <help>Flowtable</help> <constraint> <regex>[a-zA-Z0-9][\w\-\.]*</regex> </constraint> </properties> <children> #include <include/generic-description.xml.i> <leafNode name="interface"> <properties> <help>Interfaces to use this flowtable</help> <completionHelp> <script>${vyos_completion_dir}/list_interfaces</script> </completionHelp> <multi/> </properties> </leafNode> <leafNode name="offload"> <properties> <help>Offloading method</help> <completionHelp> <list>hardware software</list> </completionHelp> <valueHelp> <format>hardware</format> <description>Hardware offload</description> </valueHelp> <valueHelp> <format>software</format> <description>Software offload</description> </valueHelp> <constraint> <regex>(hardware|software)</regex> </constraint> </properties> <defaultValue>software</defaultValue> </leafNode> </children> </tagNode> <node name="group"> <properties> <help>Firewall group</help> </properties> <children> <tagNode name="address-group"> <properties> <help>Firewall address-group</help> <constraint> <regex>[a-zA-Z0-9][\w\-\.]*</regex> </constraint> </properties> <children> <leafNode name="address"> <properties> <help>Address-group member</help> <valueHelp> <format>ipv4</format> <description>IPv4 address to match</description> </valueHelp> <valueHelp> <format>ipv4range</format> <description>IPv4 range to match (e.g. 10.0.0.1-10.0.0.200)</description> </valueHelp> <constraint> <validator name="ipv4-address"/> <validator name="ipv4-range"/> </constraint> <multi/> </properties> </leafNode> <leafNode name="include"> <properties> <help>Include another address-group</help> <completionHelp> <path>firewall group address-group</path> </completionHelp> <multi/> </properties> </leafNode> #include <include/generic-description.xml.i> </children> </tagNode> <tagNode name="domain-group"> <properties> <help>Firewall domain-group</help> <constraint> <regex>[a-zA-Z_][a-zA-Z0-9][\w\-\.]*</regex> </constraint> <constraintErrorMessage>Name of domain-group can only contain alpha-numeric letters, hyphen, underscores and not start with numeric</constraintErrorMessage> </properties> <children> <leafNode name="address"> <properties> <help>Domain-group member</help> <valueHelp> <format>txt</format> <description>Domain address to match</description> </valueHelp> <constraint> <validator name="fqdn"/> </constraint> <multi/> </properties> </leafNode> #include <include/generic-description.xml.i> </children> </tagNode> + <node name="dynamic-group"> + <properties> + <help>Firewall dynamic group</help> + </properties> + <children> + <tagNode name="address-group"> + <properties> + <help>Firewall dynamic address group</help> + <constraint> + <regex>[a-zA-Z0-9][\w\-\.]*</regex> + </constraint> + </properties> + <children> + #include <include/generic-description.xml.i> + </children> + </tagNode> + <tagNode name="ipv6-address-group"> + <properties> + <help>Firewall dynamic IPv6 address group</help> + <constraint> + <regex>[a-zA-Z0-9][\w\-\.]*</regex> + </constraint> + </properties> + <children> + #include <include/generic-description.xml.i> + </children> + </tagNode> + </children> + </node> <tagNode name="interface-group"> <properties> <help>Firewall interface-group</help> <constraint> <regex>[a-zA-Z0-9][\w\-\.]*</regex> </constraint> </properties> <children> <leafNode name="interface"> <properties> <help>Interface-group member</help> <completionHelp> <script>${vyos_completion_dir}/list_interfaces</script> </completionHelp> <multi/> </properties> </leafNode> <leafNode name="include"> <properties> <help>Include another interface-group</help> <completionHelp> <path>firewall group interface-group</path> </completionHelp> <multi/> </properties> </leafNode> #include <include/generic-description.xml.i> </children> </tagNode> <tagNode name="ipv6-address-group"> <properties> <help>Firewall ipv6-address-group</help> <constraint> <regex>[a-zA-Z0-9][\w\-\.]*</regex> </constraint> </properties> <children> <leafNode name="address"> <properties> <help>Address-group member</help> <valueHelp> <format>ipv6</format> <description>IPv6 address to match</description> </valueHelp> <valueHelp> <format>ipv6range</format> <description>IPv6 range to match (e.g. 2002::1-2002::ff)</description> </valueHelp> <constraint> <validator name="ipv6-address"/> <validator name="ipv6-range"/> </constraint> <multi/> </properties> </leafNode> <leafNode name="include"> <properties> <help>Include another ipv6-address-group</help> <completionHelp> <path>firewall group ipv6-address-group</path> </completionHelp> <multi/> </properties> </leafNode> #include <include/generic-description.xml.i> </children> </tagNode> <tagNode name="ipv6-network-group"> <properties> <help>Firewall ipv6-network-group</help> <constraint> <regex>[a-zA-Z0-9][\w\-\.]*</regex> </constraint> </properties> <children> #include <include/generic-description.xml.i> <leafNode name="network"> <properties> <help>Network-group member</help> <valueHelp> <format>ipv6net</format> <description>IPv6 address to match</description> </valueHelp> <constraint> <validator name="ipv6-prefix"/> </constraint> <multi/> </properties> </leafNode> <leafNode name="include"> <properties> <help>Include another ipv6-network-group</help> <completionHelp> <path>firewall group ipv6-network-group</path> </completionHelp> <multi/> </properties> </leafNode> </children> </tagNode> <tagNode name="mac-group"> <properties> <help>Firewall mac-group</help> <constraint> <regex>[a-zA-Z0-9][\w\-\.]*</regex> </constraint> </properties> <children> #include <include/generic-description.xml.i> <leafNode name="mac-address"> <properties> <help>Mac-group member</help> <valueHelp> <format>macaddr</format> <description>MAC address to match</description> </valueHelp> <constraint> <validator name="mac-address"/> </constraint> <multi/> </properties> </leafNode> <leafNode name="include"> <properties> <help>Include another mac-group</help> <completionHelp> <path>firewall group mac-group</path> </completionHelp> <multi/> </properties> </leafNode> </children> </tagNode> <tagNode name="network-group"> <properties> <help>Firewall network-group</help> <constraint> <regex>[a-zA-Z0-9][\w\-\.]*</regex> </constraint> </properties> <children> #include <include/generic-description.xml.i> <leafNode name="network"> <properties> <help>Network-group member</help> <valueHelp> <format>ipv4net</format> <description>IPv4 Subnet to match</description> </valueHelp> <constraint> <validator name="ipv4-prefix"/> </constraint> <multi/> </properties> </leafNode> <leafNode name="include"> <properties> <help>Include another network-group</help> <completionHelp> <path>firewall group network-group</path> </completionHelp> <multi/> </properties> </leafNode> </children> </tagNode> <tagNode name="port-group"> <properties> <help>Firewall port-group</help> <constraint> <regex>[a-zA-Z0-9][\w\-\.]*</regex> </constraint> </properties> <children> #include <include/generic-description.xml.i> <leafNode name="port"> <properties> <help>Port-group member</help> <valueHelp> <format>txt</format> <description>Named port (any name in /etc/services, e.g., http)</description> </valueHelp> <valueHelp> <format>u32:1-65535</format> <description>Numbered port</description> </valueHelp> <valueHelp> <format>start-end</format> <description>Numbered port range (e.g. 1001-1050)</description> </valueHelp> <multi/> <constraint> <validator name="port-range"/> </constraint> </properties> </leafNode> <leafNode name="include"> <properties> <help>Include another port-group</help> <completionHelp> <path>firewall group port-group</path> </completionHelp> <multi/> </properties> </leafNode> </children> </tagNode> </children> </node> <node name="bridge"> <properties> <help>Bridge firewall</help> </properties> <children> #include <include/firewall/bridge-hook-forward.xml.i> #include <include/firewall/bridge-custom-name.xml.i> </children> </node> <node name="ipv4"> <properties> <help>IPv4 firewall</help> </properties> <children> #include <include/firewall/ipv4-hook-forward.xml.i> #include <include/firewall/ipv4-hook-input.xml.i> #include <include/firewall/ipv4-hook-output.xml.i> #include <include/firewall/ipv4-custom-name.xml.i> </children> </node> <node name="ipv6"> <properties> <help>IPv6 firewall</help> </properties> <children> #include <include/firewall/ipv6-hook-forward.xml.i> #include <include/firewall/ipv6-hook-input.xml.i> #include <include/firewall/ipv6-hook-output.xml.i> #include <include/firewall/ipv6-custom-name.xml.i> </children> </node> <tagNode name="zone"> <properties> <help>Zone-policy</help> <valueHelp> <format>txt</format> <description>Zone name</description> </valueHelp> <constraint> <regex>[a-zA-Z0-9][\w\-\.]*</regex> </constraint> </properties> <children> #include <include/generic-description.xml.i> #include <include/firewall/default-log.xml.i> <leafNode name="default-action"> <properties> <help>Default-action for traffic coming into this zone</help> <completionHelp> <list>drop reject</list> </completionHelp> <valueHelp> <format>drop</format> <description>Drop silently</description> </valueHelp> <valueHelp> <format>reject</format> <description>Drop and notify source</description> </valueHelp> <constraint> <regex>(drop|reject)</regex> </constraint> </properties> <defaultValue>drop</defaultValue> </leafNode> <tagNode name="from"> <properties> <help>Zone from which to filter traffic</help> <completionHelp> <path>firewall zone</path> </completionHelp> </properties> <children> <node name="firewall"> <properties> <help>Firewall options</help> </properties> <children> <leafNode name="ipv6-name"> <properties> <help>IPv6 firewall ruleset</help> <completionHelp> <path>firewall ipv6 name</path> </completionHelp> </properties> </leafNode> <leafNode name="name"> <properties> <help>IPv4 firewall ruleset</help> <completionHelp> <path>firewall ipv4 name</path> </completionHelp> </properties> </leafNode> </children> </node> </children> </tagNode> <leafNode name="interface"> <properties> <help>Interface associated with zone</help> <valueHelp> <format>txt</format> <description>Interface associated with zone</description> </valueHelp> <valueHelp> <format>vrf</format> <description>VRF associated with zone</description> </valueHelp> <completionHelp> <script>${vyos_completion_dir}/list_interfaces</script> <path>vrf name</path> </completionHelp> <multi/> </properties> </leafNode> <node name="intra-zone-filtering"> <properties> <help>Intra-zone filtering</help> </properties> <children> <leafNode name="action"> <properties> <help>Action for intra-zone traffic</help> <completionHelp> <list>accept drop</list> </completionHelp> <valueHelp> <format>accept</format> <description>Accept traffic</description> </valueHelp> <valueHelp> <format>drop</format> <description>Drop silently</description> </valueHelp> <constraint> <regex>(accept|drop)</regex> </constraint> </properties> </leafNode> <node name="firewall"> <properties> <help>Use the specified firewall chain</help> </properties> <children> <leafNode name="ipv6-name"> <properties> <help>IPv6 firewall ruleset</help> <completionHelp> <path>firewall ipv6 name</path> </completionHelp> </properties> </leafNode> <leafNode name="name"> <properties> <help>IPv4 firewall ruleset</help> <completionHelp> <path>firewall ipv4 name</path> </completionHelp> </properties> </leafNode> </children> </node> </children> </node> <leafNode name="local-zone"> <properties> <help>Zone to be local-zone</help> <valueless/> </properties> </leafNode> </children> </tagNode> </children> </node> </interfaceDefinition> diff --git a/interface-definitions/include/firewall/add-dynamic-address-groups.xml.i b/interface-definitions/include/firewall/add-dynamic-address-groups.xml.i new file mode 100644 index 000000000..769761cb6 --- /dev/null +++ b/interface-definitions/include/firewall/add-dynamic-address-groups.xml.i @@ -0,0 +1,34 @@ +<!-- include start from firewall/add-dynamic-address-groups.xml.i --> +<leafNode name="address-group"> + <properties> + <help>Dynamic address-group</help> + <completionHelp> + <path>firewall group dynamic-group address-group</path> + </completionHelp> + </properties> +</leafNode> +<leafNode name="timeout"> + <properties> + <help>Set timeout</help> + <valueHelp> + <format><number>s</format> + <description>Timeout value in seconds</description> + </valueHelp> + <valueHelp> + <format><number>m</format> + <description>Timeout value in minutes</description> + </valueHelp> + <valueHelp> + <format><number>h</format> + <description>Timeout value in hours</description> + </valueHelp> + <valueHelp> + <format><number>d</format> + <description>Timeout value in days</description> + </valueHelp> + <constraint> + <regex>\d+(s|m|h|d)</regex> + </constraint> + </properties> +</leafNode> +<!-- include end --> \ No newline at end of file diff --git a/interface-definitions/include/firewall/add-dynamic-ipv6-address-groups.xml.i b/interface-definitions/include/firewall/add-dynamic-ipv6-address-groups.xml.i new file mode 100644 index 000000000..7bd91c58a --- /dev/null +++ b/interface-definitions/include/firewall/add-dynamic-ipv6-address-groups.xml.i @@ -0,0 +1,34 @@ +<!-- include start from firewall/add-dynamic-ipv6-address-groups.xml.i --> +<leafNode name="address-group"> + <properties> + <help>Dynamic ipv6-address-group</help> + <completionHelp> + <path>firewall group dynamic-group ipv6-address-group</path> + </completionHelp> + </properties> +</leafNode> +<leafNode name="timeout"> + <properties> + <help>Set timeout</help> + <valueHelp> + <format><number>s</format> + <description>Timeout value in seconds</description> + </valueHelp> + <valueHelp> + <format><number>m</format> + <description>Timeout value in minutes</description> + </valueHelp> + <valueHelp> + <format><number>h</format> + <description>Timeout value in hours</description> + </valueHelp> + <valueHelp> + <format><number>d</format> + <description>Timeout value in days</description> + </valueHelp> + <constraint> + <regex>\d+(s|m|h|d)</regex> + </constraint> + </properties> +</leafNode> +<!-- include end --> \ No newline at end of file diff --git a/interface-definitions/include/firewall/common-rule-ipv4.xml.i b/interface-definitions/include/firewall/common-rule-ipv4.xml.i index 4ed179ae7..158c7a662 100644 --- a/interface-definitions/include/firewall/common-rule-ipv4.xml.i +++ b/interface-definitions/include/firewall/common-rule-ipv4.xml.i @@ -1,72 +1,97 @@ <!-- include start from firewall/common-rule-ipv4.xml.i --> #include <include/firewall/common-rule-inet.xml.i> #include <include/firewall/ttl.xml.i> +<node name="add-address-to-group"> + <properties> + <help>Add ip address to dynamic address-group</help> + </properties> + <children> + <node name="source-address"> + <properties> + <help>Add source ip addresses to dynamic address-group</help> + </properties> + <children> + #include <include/firewall/add-dynamic-address-groups.xml.i> + </children> + </node> + <node name="destination-address"> + <properties> + <help>Add destination ip addresses to dynamic address-group</help> + </properties> + <children> + #include <include/firewall/add-dynamic-address-groups.xml.i> + </children> + </node> + </children> +</node> <node name="destination"> <properties> <help>Destination parameters</help> </properties> <children> #include <include/firewall/address.xml.i> #include <include/firewall/address-mask.xml.i> #include <include/firewall/fqdn.xml.i> #include <include/firewall/geoip.xml.i> #include <include/firewall/mac-address.xml.i> #include <include/firewall/port.xml.i> #include <include/firewall/source-destination-group.xml.i> + #include <include/firewall/source-destination-dynamic-group.xml.i> </children> </node> <node name="icmp"> <properties> <help>ICMP type and code information</help> </properties> <children> <leafNode name="code"> <properties> <help>ICMP code</help> <valueHelp> <format>u32:0-255</format> <description>ICMP code (0-255)</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 0-255"/> </constraint> </properties> </leafNode> <leafNode name="type"> <properties> <help>ICMP type</help> <valueHelp> <format>u32:0-255</format> <description>ICMP type (0-255)</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 0-255"/> </constraint> </properties> </leafNode> #include <include/firewall/icmp-type-name.xml.i> </children> </node> <leafNode name="jump-target"> <properties> <help>Set jump target. Action jump must be defined to use this setting</help> <completionHelp> <path>firewall ipv4 name</path> </completionHelp> </properties> </leafNode> <node name="source"> <properties> <help>Source parameters</help> </properties> <children> #include <include/firewall/address.xml.i> #include <include/firewall/address-mask.xml.i> #include <include/firewall/fqdn.xml.i> #include <include/firewall/geoip.xml.i> #include <include/firewall/mac-address.xml.i> #include <include/firewall/port.xml.i> #include <include/firewall/source-destination-group.xml.i> + #include <include/firewall/source-destination-dynamic-group.xml.i> </children> </node> <!-- include end --> \ No newline at end of file diff --git a/interface-definitions/include/firewall/common-rule-ipv6.xml.i b/interface-definitions/include/firewall/common-rule-ipv6.xml.i index 6219557db..78eeb361e 100644 --- a/interface-definitions/include/firewall/common-rule-ipv6.xml.i +++ b/interface-definitions/include/firewall/common-rule-ipv6.xml.i @@ -1,72 +1,97 @@ <!-- include start from firewall/common-rule-ipv6.xml.i --> #include <include/firewall/common-rule-inet.xml.i> #include <include/firewall/hop-limit.xml.i> +<node name="add-address-to-group"> + <properties> + <help>Add ipv6 address to dynamic ipv6-address-group</help> + </properties> + <children> + <node name="source-address"> + <properties> + <help>Add source ipv6 addresses to dynamic ipv6-address-group</help> + </properties> + <children> + #include <include/firewall/add-dynamic-ipv6-address-groups.xml.i> + </children> + </node> + <node name="destination-address"> + <properties> + <help>Add destination ipv6 addresses to dynamic ipv6-address-group</help> + </properties> + <children> + #include <include/firewall/add-dynamic-ipv6-address-groups.xml.i> + </children> + </node> + </children> +</node> <node name="destination"> <properties> <help>Destination parameters</help> </properties> <children> #include <include/firewall/address-ipv6.xml.i> #include <include/firewall/address-mask-ipv6.xml.i> #include <include/firewall/fqdn.xml.i> #include <include/firewall/geoip.xml.i> #include <include/firewall/mac-address.xml.i> #include <include/firewall/port.xml.i> #include <include/firewall/source-destination-group-ipv6.xml.i> + #include <include/firewall/source-destination-dynamic-group-ipv6.xml.i> </children> </node> <node name="icmpv6"> <properties> <help>ICMPv6 type and code information</help> </properties> <children> <leafNode name="code"> <properties> <help>ICMPv6 code</help> <valueHelp> <format>u32:0-255</format> <description>ICMPv6 code (0-255)</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 0-255"/> </constraint> </properties> </leafNode> <leafNode name="type"> <properties> <help>ICMPv6 type</help> <valueHelp> <format>u32:0-255</format> <description>ICMPv6 type (0-255)</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 0-255"/> </constraint> </properties> </leafNode> #include <include/firewall/icmpv6-type-name.xml.i> </children> </node> <leafNode name="jump-target"> <properties> <help>Set jump target. Action jump must be defined to use this setting</help> <completionHelp> <path>firewall ipv6 name</path> </completionHelp> </properties> </leafNode> <node name="source"> <properties> <help>Source parameters</help> </properties> <children> #include <include/firewall/address-ipv6.xml.i> #include <include/firewall/address-mask-ipv6.xml.i> #include <include/firewall/fqdn.xml.i> #include <include/firewall/geoip.xml.i> #include <include/firewall/mac-address.xml.i> #include <include/firewall/port.xml.i> #include <include/firewall/source-destination-group-ipv6.xml.i> + #include <include/firewall/source-destination-dynamic-group-ipv6.xml.i> </children> </node> <!-- include end --> \ No newline at end of file diff --git a/interface-definitions/include/firewall/source-destination-dynamic-group-ipv6.xml.i b/interface-definitions/include/firewall/source-destination-dynamic-group-ipv6.xml.i new file mode 100644 index 000000000..845f8fe7c --- /dev/null +++ b/interface-definitions/include/firewall/source-destination-dynamic-group-ipv6.xml.i @@ -0,0 +1,17 @@ +<!-- include start from firewall/source-destination-dynamic-group-ipv6.xml.i --> +<node name="group"> + <properties> + <help>Group</help> + </properties> + <children> + <leafNode name="dynamic-address-group"> + <properties> + <help>Group of dynamic ipv6 addresses</help> + <completionHelp> + <path>firewall group dynamic-group ipv6-address-group</path> + </completionHelp> + </properties> + </leafNode> + </children> +</node> +<!-- include end --> diff --git a/interface-definitions/include/firewall/source-destination-dynamic-group.xml.i b/interface-definitions/include/firewall/source-destination-dynamic-group.xml.i new file mode 100644 index 000000000..29ab98c68 --- /dev/null +++ b/interface-definitions/include/firewall/source-destination-dynamic-group.xml.i @@ -0,0 +1,17 @@ +<!-- include start from firewall/source-destination-dynamic-group.xml.i --> +<node name="group"> + <properties> + <help>Group</help> + </properties> + <children> + <leafNode name="dynamic-address-group"> + <properties> + <help>Group of dynamic addresses</help> + <completionHelp> + <path>firewall group dynamic-group address-group</path> + </completionHelp> + </properties> + </leafNode> + </children> +</node> +<!-- include end --> diff --git a/python/vyos/firewall.py b/python/vyos/firewall.py index a2622fa00..1b977b16e 100644 --- a/python/vyos/firewall.py +++ b/python/vyos/firewall.py @@ -1,625 +1,645 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import csv import gzip import os import re from pathlib import Path from socket import AF_INET from socket import AF_INET6 from socket import getaddrinfo from time import strftime from vyos.remote import download from vyos.template import is_ipv4 from vyos.template import render from vyos.utils.dict import dict_search_args from vyos.utils.dict import dict_search_recursive from vyos.utils.process import call from vyos.utils.process import cmd from vyos.utils.process import run # Domain Resolver def fqdn_config_parse(firewall): firewall['ip_fqdn'] = {} firewall['ip6_fqdn'] = {} for domain, path in dict_search_recursive(firewall, 'fqdn'): hook_name = path[1] priority = path[2] fw_name = path[2] rule = path[4] suffix = path[5][0] set_name = f'{hook_name}_{priority}_{rule}_{suffix}' if (path[0] == 'ipv4') and (path[1] == 'forward' or path[1] == 'input' or path[1] == 'output' or path[1] == 'name'): firewall['ip_fqdn'][set_name] = domain elif (path[0] == 'ipv6') and (path[1] == 'forward' or path[1] == 'input' or path[1] == 'output' or path[1] == 'name'): if path[1] == 'name': set_name = f'name6_{priority}_{rule}_{suffix}' firewall['ip6_fqdn'][set_name] = domain def fqdn_resolve(fqdn, ipv6=False): try: res = getaddrinfo(fqdn, None, AF_INET6 if ipv6 else AF_INET) return set(item[4][0] for item in res) except: return None # End Domain Resolver def find_nftables_rule(table, chain, rule_matches=[]): # Find rule in table/chain that matches all criteria and return the handle results = cmd(f'sudo nft -a list chain {table} {chain}').split("\n") for line in results: if all(rule_match in line for rule_match in rule_matches): handle_search = re.search('handle (\d+)', line) if handle_search: return handle_search[1] return None def remove_nftables_rule(table, chain, handle): cmd(f'sudo nft delete rule {table} {chain} handle {handle}') # Functions below used by template generation def nft_action(vyos_action): if vyos_action == 'accept': return 'return' return vyos_action def parse_rule(rule_conf, hook, fw_name, rule_id, ip_name): output = [] if ip_name == 'ip6': def_suffix = '6' family = 'ipv6' else: def_suffix = '' family = 'bri' if ip_name == 'bri' else 'ipv4' if 'state' in rule_conf and rule_conf['state']: states = ",".join([s for s in rule_conf['state']]) if states: output.append(f'ct state {{{states}}}') if 'conntrack_helper' in rule_conf: helper_map = {'h323': ['RAS', 'Q.931'], 'nfs': ['rpc'], 'sqlnet': ['tns']} helper_out = [] for helper in rule_conf['conntrack_helper']: if helper in helper_map: helper_out.extend(helper_map[helper]) else: helper_out.append(helper) if helper_out: helper_str = ','.join(f'"{s}"' for s in helper_out) output.append(f'ct helper {{{helper_str}}}') if 'connection_status' in rule_conf and rule_conf['connection_status']: status = rule_conf['connection_status'] if status['nat'] == 'destination': nat_status = '{dnat}' output.append(f'ct status {nat_status}') if status['nat'] == 'source': nat_status = '{snat}' output.append(f'ct status {nat_status}') if 'protocol' in rule_conf and rule_conf['protocol'] != 'all': proto = rule_conf['protocol'] operator = '' if proto[0] == '!': operator = '!=' proto = proto[1:] if proto == 'tcp_udp': proto = '{tcp, udp}' output.append(f'meta l4proto {operator} {proto}') for side in ['destination', 'source']: if side in rule_conf: prefix = side[0] side_conf = rule_conf[side] address_mask = side_conf.get('address_mask', None) if 'address' in side_conf: suffix = side_conf['address'] operator = '' exclude = suffix[0] == '!' if exclude: operator = '!= ' suffix = suffix[1:] if address_mask: operator = '!=' if exclude else '==' operator = f'& {address_mask} {operator} ' output.append(f'{ip_name} {prefix}addr {operator}{suffix}') if 'fqdn' in side_conf: fqdn = side_conf['fqdn'] hook_name = '' operator = '' if fqdn[0] == '!': operator = '!=' if hook == 'FWD': hook_name = 'forward' if hook == 'INP': hook_name = 'input' if hook == 'OUT': hook_name = 'output' if hook == 'NAM': hook_name = f'name{def_suffix}' output.append(f'{ip_name} {prefix}addr {operator} @FQDN_{hook_name}_{fw_name}_{rule_id}_{prefix}') if dict_search_args(side_conf, 'geoip', 'country_code'): operator = '' hook_name = '' if dict_search_args(side_conf, 'geoip', 'inverse_match') != None: operator = '!=' if hook == 'FWD': hook_name = 'forward' if hook == 'INP': hook_name = 'input' if hook == 'OUT': hook_name = 'output' if hook == 'NAM': hook_name = f'name' output.append(f'{ip_name} {prefix}addr {operator} @GEOIP_CC{def_suffix}_{hook_name}_{fw_name}_{rule_id}') if 'mac_address' in side_conf: suffix = side_conf["mac_address"] if suffix[0] == '!': suffix = f'!= {suffix[1:]}' output.append(f'ether {prefix}addr {suffix}') if 'port' in side_conf: proto = rule_conf['protocol'] port = side_conf['port'].split(',') ports = [] negated_ports = [] for p in port: if p[0] == '!': negated_ports.append(p[1:]) else: ports.append(p) if proto == 'tcp_udp': proto = 'th' if ports: ports_str = ','.join(ports) output.append(f'{proto} {prefix}port {{{ports_str}}}') if negated_ports: negated_ports_str = ','.join(negated_ports) output.append(f'{proto} {prefix}port != {{{negated_ports_str}}}') if 'group' in side_conf: group = side_conf['group'] if 'address_group' in group: group_name = group['address_group'] operator = '' exclude = group_name[0] == "!" if exclude: operator = '!=' group_name = group_name[1:] if address_mask: operator = '!=' if exclude else '==' operator = f'& {address_mask} {operator}' output.append(f'{ip_name} {prefix}addr {operator} @A{def_suffix}_{group_name}') + elif 'dynamic_address_group' in group: + group_name = group['dynamic_address_group'] + operator = '' + exclude = group_name[0] == "!" + if exclude: + operator = '!=' + group_name = group_name[1:] + output.append(f'{ip_name} {prefix}addr {operator} @DA{def_suffix}_{group_name}') # Generate firewall group domain-group elif 'domain_group' in group: group_name = group['domain_group'] operator = '' if group_name[0] == '!': operator = '!=' group_name = group_name[1:] output.append(f'{ip_name} {prefix}addr {operator} @D_{group_name}') elif 'network_group' in group: group_name = group['network_group'] operator = '' if group_name[0] == '!': operator = '!=' group_name = group_name[1:] output.append(f'{ip_name} {prefix}addr {operator} @N{def_suffix}_{group_name}') if 'mac_group' in group: group_name = group['mac_group'] operator = '' if group_name[0] == '!': operator = '!=' group_name = group_name[1:] output.append(f'ether {prefix}addr {operator} @M_{group_name}') if 'port_group' in group: proto = rule_conf['protocol'] group_name = group['port_group'] if proto == 'tcp_udp': proto = 'th' operator = '' if group_name[0] == '!': operator = '!=' group_name = group_name[1:] output.append(f'{proto} {prefix}port {operator} @P_{group_name}') if dict_search_args(rule_conf, 'action') == 'synproxy': output.append('ct state invalid,untracked') if 'hop_limit' in rule_conf: operators = {'eq': '==', 'gt': '>', 'lt': '<'} for op, operator in operators.items(): if op in rule_conf['hop_limit']: value = rule_conf['hop_limit'][op] output.append(f'ip6 hoplimit {operator} {value}') if 'inbound_interface' in rule_conf: operator = '' if 'name' in rule_conf['inbound_interface']: iiface = rule_conf['inbound_interface']['name'] if iiface[0] == '!': operator = '!=' iiface = iiface[1:] output.append(f'iifname {operator} {{{iiface}}}') else: iiface = rule_conf['inbound_interface']['group'] if iiface[0] == '!': operator = '!=' iiface = iiface[1:] output.append(f'iifname {operator} @I_{iiface}') if 'outbound_interface' in rule_conf: operator = '' if 'name' in rule_conf['outbound_interface']: oiface = rule_conf['outbound_interface']['name'] if oiface[0] == '!': operator = '!=' oiface = oiface[1:] output.append(f'oifname {operator} {{{oiface}}}') else: oiface = rule_conf['outbound_interface']['group'] if oiface[0] == '!': operator = '!=' oiface = oiface[1:] output.append(f'oifname {operator} @I_{oiface}') if 'ttl' in rule_conf: operators = {'eq': '==', 'gt': '>', 'lt': '<'} for op, operator in operators.items(): if op in rule_conf['ttl']: value = rule_conf['ttl'][op] output.append(f'ip ttl {operator} {value}') for icmp in ['icmp', 'icmpv6']: if icmp in rule_conf: if 'type_name' in rule_conf[icmp]: output.append(icmp + ' type ' + rule_conf[icmp]['type_name']) else: if 'code' in rule_conf[icmp]: output.append(icmp + ' code ' + rule_conf[icmp]['code']) if 'type' in rule_conf[icmp]: output.append(icmp + ' type ' + rule_conf[icmp]['type']) if 'packet_length' in rule_conf: lengths_str = ','.join(rule_conf['packet_length']) output.append(f'ip{def_suffix} length {{{lengths_str}}}') if 'packet_length_exclude' in rule_conf: negated_lengths_str = ','.join(rule_conf['packet_length_exclude']) output.append(f'ip{def_suffix} length != {{{negated_lengths_str}}}') if 'packet_type' in rule_conf: output.append(f'pkttype ' + rule_conf['packet_type']) if 'dscp' in rule_conf: dscp_str = ','.join(rule_conf['dscp']) output.append(f'ip{def_suffix} dscp {{{dscp_str}}}') if 'dscp_exclude' in rule_conf: negated_dscp_str = ','.join(rule_conf['dscp_exclude']) output.append(f'ip{def_suffix} dscp != {{{negated_dscp_str}}}') if 'ipsec' in rule_conf: if 'match_ipsec' in rule_conf['ipsec']: output.append('meta ipsec == 1') if 'match_none' in rule_conf['ipsec']: output.append('meta ipsec == 0') if 'fragment' in rule_conf: # Checking for fragmentation after priority -400 is not possible, # so we use a priority -450 hook to set a mark if 'match_frag' in rule_conf['fragment']: output.append('meta mark 0xffff1') if 'match_non_frag' in rule_conf['fragment']: output.append('meta mark != 0xffff1') if 'limit' in rule_conf: if 'rate' in rule_conf['limit']: output.append(f'limit rate {rule_conf["limit"]["rate"]}') if 'burst' in rule_conf['limit']: output.append(f'burst {rule_conf["limit"]["burst"]} packets') if 'recent' in rule_conf: count = rule_conf['recent']['count'] time = rule_conf['recent']['time'] output.append(f'add @RECENT{def_suffix}_{hook}_{fw_name}_{rule_id} {{ {ip_name} saddr limit rate over {count}/{time} burst {count} packets }}') if 'time' in rule_conf: output.append(parse_time(rule_conf['time'])) tcp_flags = dict_search_args(rule_conf, 'tcp', 'flags') if tcp_flags: output.append(parse_tcp_flags(tcp_flags)) # TCP MSS tcp_mss = dict_search_args(rule_conf, 'tcp', 'mss') if tcp_mss: output.append(f'tcp option maxseg size {tcp_mss}') if 'connection_mark' in rule_conf: conn_mark_str = ','.join(rule_conf['connection_mark']) output.append(f'ct mark {{{conn_mark_str}}}') if 'mark' in rule_conf: mark = rule_conf['mark'] operator = '' if mark[0] == '!': operator = '!=' mark = mark[1:] output.append(f'meta mark {operator} {{{mark}}}') if 'vlan' in rule_conf: if 'id' in rule_conf['vlan']: output.append(f'vlan id {rule_conf["vlan"]["id"]}') if 'priority' in rule_conf['vlan']: output.append(f'vlan pcp {rule_conf["vlan"]["priority"]}') if 'log' in rule_conf: action = rule_conf['action'] if 'action' in rule_conf else 'accept' #output.append(f'log prefix "[{fw_name[:19]}-{rule_id}-{action[:1].upper()}]"') output.append(f'log prefix "[{family}-{hook}-{fw_name}-{rule_id}-{action[:1].upper()}]"') ##{family}-{hook}-{fw_name}-{rule_id} if 'log_options' in rule_conf: if 'level' in rule_conf['log_options']: log_level = rule_conf['log_options']['level'] output.append(f'log level {log_level}') if 'group' in rule_conf['log_options']: log_group = rule_conf['log_options']['group'] output.append(f'log group {log_group}') if 'queue_threshold' in rule_conf['log_options']: queue_threshold = rule_conf['log_options']['queue_threshold'] output.append(f'queue-threshold {queue_threshold}') if 'snapshot_length' in rule_conf['log_options']: log_snaplen = rule_conf['log_options']['snapshot_length'] output.append(f'snaplen {log_snaplen}') output.append('counter') + if 'add_address_to_group' in rule_conf: + for side in ['destination_address', 'source_address']: + if side in rule_conf['add_address_to_group']: + prefix = side[0] + side_conf = rule_conf['add_address_to_group'][side] + dyn_group = side_conf['address_group'] + if 'timeout' in side_conf: + timeout_value = side_conf['timeout'] + output.append(f'set update ip{def_suffix} {prefix}addr timeout {timeout_value} @DA{def_suffix}_{dyn_group}') + else: + output.append(f'set update ip{def_suffix} saddr @DA{def_suffix}_{dyn_group}') + if 'set' in rule_conf: output.append(parse_policy_set(rule_conf['set'], def_suffix)) if 'action' in rule_conf: # Change action=return to action=action # #output.append(nft_action(rule_conf['action'])) if rule_conf['action'] == 'offload': offload_target = rule_conf['offload_target'] output.append(f'flow add @VYOS_FLOWTABLE_{offload_target}') else: output.append(f'{rule_conf["action"]}') if 'jump' in rule_conf['action']: target = rule_conf['jump_target'] output.append(f'NAME{def_suffix}_{target}') if 'queue' in rule_conf['action']: if 'queue' in rule_conf: target = rule_conf['queue'] output.append(f'num {target}') if 'queue_options' in rule_conf: queue_opts = ','.join(rule_conf['queue_options']) output.append(f'{queue_opts}') # Synproxy if 'synproxy' in rule_conf: synproxy_mss = dict_search_args(rule_conf, 'synproxy', 'tcp', 'mss') if synproxy_mss: output.append(f'mss {synproxy_mss}') synproxy_ws = dict_search_args(rule_conf, 'synproxy', 'tcp', 'window_scale') if synproxy_ws: output.append(f'wscale {synproxy_ws} timestamp sack-perm') else: output.append('return') output.append(f'comment "{family}-{hook}-{fw_name}-{rule_id}"') return " ".join(output) def parse_tcp_flags(flags): include = [flag for flag in flags if flag != 'not'] exclude = list(flags['not']) if 'not' in flags else [] return f'tcp flags & ({"|".join(include + exclude)}) == {"|".join(include) if include else "0x0"}' def parse_time(time): out = [] if 'startdate' in time: start = time['startdate'] if 'T' not in start and 'starttime' in time: start += f' {time["starttime"]}' out.append(f'time >= "{start}"') if 'starttime' in time and 'startdate' not in time: out.append(f'hour >= "{time["starttime"]}"') if 'stopdate' in time: stop = time['stopdate'] if 'T' not in stop and 'stoptime' in time: stop += f' {time["stoptime"]}' out.append(f'time < "{stop}"') if 'stoptime' in time and 'stopdate' not in time: out.append(f'hour < "{time["stoptime"]}"') if 'weekdays' in time: days = time['weekdays'].split(",") out_days = [f'"{day}"' for day in days if day[0] != '!'] out.append(f'day {{{",".join(out_days)}}}') return " ".join(out) def parse_policy_set(set_conf, def_suffix): out = [] if 'connection_mark' in set_conf: conn_mark = set_conf['connection_mark'] out.append(f'ct mark set {conn_mark}') if 'dscp' in set_conf: dscp = set_conf['dscp'] out.append(f'ip{def_suffix} dscp set {dscp}') if 'mark' in set_conf: mark = set_conf['mark'] out.append(f'meta mark set {mark}') if 'table' in set_conf: table = set_conf['table'] if table == 'main': table = '254' mark = 0x7FFFFFFF - int(table) out.append(f'meta mark set {mark}') if 'tcp_mss' in set_conf: mss = set_conf['tcp_mss'] out.append(f'tcp option maxseg size set {mss}') return " ".join(out) # GeoIP nftables_geoip_conf = '/run/nftables-geoip.conf' geoip_database = '/usr/share/vyos-geoip/dbip-country-lite.csv.gz' geoip_lock_file = '/run/vyos-geoip.lock' def geoip_load_data(codes=[]): data = None if not os.path.exists(geoip_database): return [] try: with gzip.open(geoip_database, mode='rt') as csv_fh: reader = csv.reader(csv_fh) out = [] for start, end, code in reader: if code.lower() in codes: out.append([start, end, code.lower()]) return out except: print('Error: Failed to open GeoIP database') return [] def geoip_download_data(): url = 'https://download.db-ip.com/free/dbip-country-lite-{}.csv.gz'.format(strftime("%Y-%m")) try: dirname = os.path.dirname(geoip_database) if not os.path.exists(dirname): os.mkdir(dirname) download(geoip_database, url) print("Downloaded GeoIP database") return True except: print("Error: Failed to download GeoIP database") return False class GeoIPLock(object): def __init__(self, file): self.file = file def __enter__(self): if os.path.exists(self.file): return False Path(self.file).touch() return True def __exit__(self, exc_type, exc_value, tb): os.unlink(self.file) def geoip_update(firewall, force=False): with GeoIPLock(geoip_lock_file) as lock: if not lock: print("Script is already running") return False if not firewall: print("Firewall is not configured") return True if not os.path.exists(geoip_database): if not geoip_download_data(): return False elif force: geoip_download_data() ipv4_codes = {} ipv6_codes = {} ipv4_sets = {} ipv6_sets = {} # Map country codes to set names for codes, path in dict_search_recursive(firewall, 'country_code'): set_name = f'GEOIP_CC_{path[1]}_{path[2]}_{path[4]}' if ( path[0] == 'ipv4'): for code in codes: ipv4_codes.setdefault(code, []).append(set_name) elif ( path[0] == 'ipv6' ): set_name = f'GEOIP_CC6_{path[1]}_{path[2]}_{path[4]}' for code in codes: ipv6_codes.setdefault(code, []).append(set_name) if not ipv4_codes and not ipv6_codes: if force: print("GeoIP not in use by firewall") return True geoip_data = geoip_load_data([*ipv4_codes, *ipv6_codes]) # Iterate IP blocks to assign to sets for start, end, code in geoip_data: ipv4 = is_ipv4(start) if code in ipv4_codes and ipv4: ip_range = f'{start}-{end}' if start != end else start for setname in ipv4_codes[code]: ipv4_sets.setdefault(setname, []).append(ip_range) if code in ipv6_codes and not ipv4: ip_range = f'{start}-{end}' if start != end else start for setname in ipv6_codes[code]: ipv6_sets.setdefault(setname, []).append(ip_range) render(nftables_geoip_conf, 'firewall/nftables-geoip-update.j2', { 'ipv4_sets': ipv4_sets, 'ipv6_sets': ipv6_sets }) result = run(f'nft -f {nftables_geoip_conf}') if result != 0: print('Error: GeoIP failed to update firewall') return False return True diff --git a/smoketest/scripts/cli/test_firewall.py b/smoketest/scripts/cli/test_firewall.py index 2be616da1..66684b265 100755 --- a/smoketest/scripts/cli/test_firewall.py +++ b/smoketest/scripts/cli/test_firewall.py @@ -1,793 +1,874 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import unittest from glob import glob from time import sleep from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.utils.process import cmd from vyos.utils.process import run sysfs_config = { 'all_ping': {'sysfs': '/proc/sys/net/ipv4/icmp_echo_ignore_all', 'default': '0', 'test_value': 'disable'}, 'broadcast_ping': {'sysfs': '/proc/sys/net/ipv4/icmp_echo_ignore_broadcasts', 'default': '1', 'test_value': 'enable'}, 'ip_src_route': {'sysfs': '/proc/sys/net/ipv4/conf/*/accept_source_route', 'default': '0', 'test_value': 'enable'}, 'ipv6_receive_redirects': {'sysfs': '/proc/sys/net/ipv6/conf/*/accept_redirects', 'default': '0', 'test_value': 'enable'}, 'ipv6_src_route': {'sysfs': '/proc/sys/net/ipv6/conf/*/accept_source_route', 'default': '-1', 'test_value': 'enable'}, 'log_martians': {'sysfs': '/proc/sys/net/ipv4/conf/all/log_martians', 'default': '1', 'test_value': 'disable'}, 'receive_redirects': {'sysfs': '/proc/sys/net/ipv4/conf/*/accept_redirects', 'default': '0', 'test_value': 'enable'}, 'send_redirects': {'sysfs': '/proc/sys/net/ipv4/conf/*/send_redirects', 'default': '1', 'test_value': 'disable'}, 'syn_cookies': {'sysfs': '/proc/sys/net/ipv4/tcp_syncookies', 'default': '1', 'test_value': 'disable'}, 'twa_hazards_protection': {'sysfs': '/proc/sys/net/ipv4/tcp_rfc1337', 'default': '0', 'test_value': 'enable'} } class TestFirewall(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestFirewall, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, ['firewall']) @classmethod def tearDownClass(cls): super(TestFirewall, cls).tearDownClass() def tearDown(self): self.cli_delete(['firewall']) self.cli_commit() # Verify chains/sets are cleaned up from nftables nftables_search = [ ['set M_smoketest_mac'], ['set N_smoketest_network'], ['set P_smoketest_port'], ['set D_smoketest_domain'], ['set RECENT_smoketest_4'], ['chain NAME_smoketest'] ] self.verify_nftables(nftables_search, 'ip vyos_filter', inverse=True) def verify_nftables(self, nftables_search, table, inverse=False, args=''): nftables_output = cmd(f'sudo nft {args} list table {table}') for search in nftables_search: matched = False for line in nftables_output.split("\n"): if all(item in line for item in search): matched = True break self.assertTrue(not matched if inverse else matched, msg=search) def verify_nftables_chain(self, nftables_search, table, chain, inverse=False, args=''): nftables_output = cmd(f'sudo nft {args} list chain {table} {chain}') for search in nftables_search: matched = False for line in nftables_output.split("\n"): if all(item in line for item in search): matched = True break self.assertTrue(not matched if inverse else matched, msg=search) def wait_for_domain_resolver(self, table, set_name, element, max_wait=10): # Resolver no longer blocks commit, need to wait for daemon to populate set count = 0 while count < max_wait: code = run(f'sudo nft get element {table} {set_name} {{ {element} }}') if code == 0: return True count += 1 sleep(1) return False def test_geoip(self): self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '1', 'action', 'drop']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '1', 'source', 'geoip', 'country-code', 'se']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '1', 'source', 'geoip', 'country-code', 'gb']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '2', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '2', 'source', 'geoip', 'country-code', 'de']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '2', 'source', 'geoip', 'country-code', 'fr']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '2', 'source', 'geoip', 'inverse-match']) self.cli_commit() nftables_search = [ ['ip saddr @GEOIP_CC_name_smoketest_1', 'drop'], ['ip saddr != @GEOIP_CC_name_smoketest_2', 'accept'] ] # -t prevents 1000+ GeoIP elements being returned self.verify_nftables(nftables_search, 'ip vyos_filter', args='-t') def test_groups(self): hostmap_path = ['system', 'static-host-mapping', 'host-name'] example_org = ['192.0.2.8', '192.0.2.10', '192.0.2.11'] self.cli_set(hostmap_path + ['example.com', 'inet', '192.0.2.5']) for ips in example_org: self.cli_set(hostmap_path + ['example.org', 'inet', ips]) self.cli_commit() self.cli_set(['firewall', 'group', 'mac-group', 'smoketest_mac', 'mac-address', '00:01:02:03:04:05']) self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network', 'network', '172.16.99.0/24']) self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port', 'port', '53']) self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port', 'port', '123']) self.cli_set(['firewall', 'group', 'domain-group', 'smoketest_domain', 'address', 'example.com']) self.cli_set(['firewall', 'group', 'domain-group', 'smoketest_domain', 'address', 'example.org']) self.cli_set(['firewall', 'group', 'interface-group', 'smoketest_interface', 'interface', 'eth0']) self.cli_set(['firewall', 'group', 'interface-group', 'smoketest_interface', 'interface', 'vtun0']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'source', 'group', 'network-group', 'smoketest_network']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'destination', 'address', '172.16.10.10']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'destination', 'group', 'port-group', 'smoketest_port']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'protocol', 'tcp_udp']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '2', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '2', 'source', 'group', 'mac-group', 'smoketest_mac']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '3', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '3', 'source', 'group', 'domain-group', 'smoketest_domain']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '4', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '4', 'outbound-interface', 'group', '!smoketest_interface']) self.cli_commit() self.wait_for_domain_resolver('ip vyos_filter', 'D_smoketest_domain', '192.0.2.5') nftables_search = [ ['ip saddr @N_smoketest_network', 'ip daddr 172.16.10.10', 'th dport @P_smoketest_port', 'accept'], ['elements = { 172.16.99.0/24 }'], ['elements = { 53, 123 }'], ['ether saddr @M_smoketest_mac', 'accept'], ['elements = { 00:01:02:03:04:05 }'], ['set D_smoketest_domain'], ['elements = { 192.0.2.5, 192.0.2.8,'], ['192.0.2.10, 192.0.2.11 }'], ['ip saddr @D_smoketest_domain', 'accept'], ['oifname != @I_smoketest_interface', 'accept'] ] self.verify_nftables(nftables_search, 'ip vyos_filter') self.cli_delete(['system', 'static-host-mapping']) self.cli_commit() def test_nested_groups(self): self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network', 'network', '172.16.99.0/24']) self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network1', 'network', '172.16.101.0/24']) self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network1', 'include', 'smoketest_network']) self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port', 'port', '53']) self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port1', 'port', '123']) self.cli_set(['firewall', 'group', 'port-group', 'smoketest_port1', 'include', 'smoketest_port']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '1', 'source', 'group', 'network-group', 'smoketest_network1']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '1', 'destination', 'group', 'port-group', 'smoketest_port1']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '1', 'protocol', 'tcp_udp']) self.cli_commit() # Test circular includes self.cli_set(['firewall', 'group', 'network-group', 'smoketest_network', 'include', 'smoketest_network1']) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(['firewall', 'group', 'network-group', 'smoketest_network', 'include', 'smoketest_network1']) nftables_search = [ ['ip saddr @N_smoketest_network1', 'th dport @P_smoketest_port1', 'accept'], ['elements = { 172.16.99.0/24, 172.16.101.0/24 }'], ['elements = { 53, 123 }'] ] self.verify_nftables(nftables_search, 'ip vyos_filter') def test_ipv4_basic_rules(self): name = 'smoketest' interface = 'eth0' interface_inv = '!eth0' interface_wc = 'l2tp*' mss_range = '501-1460' conn_mark = '555' self.cli_set(['firewall', 'ipv4', 'name', name, 'default-action', 'drop']) self.cli_set(['firewall', 'ipv4', 'name', name, 'default-log']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'source', 'address', '172.16.20.10']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'destination', 'address', '172.16.10.10']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'log']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'log-options', 'level', 'debug']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'ttl', 'eq', '15']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'action', 'reject']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'protocol', 'tcp']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'destination', 'port', '8888']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'log']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'log-options', 'level', 'err']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'tcp', 'flags', 'syn']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'tcp', 'flags', 'not', 'ack']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'ttl', 'gt', '102']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'default-action', 'drop']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'default-log']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '3', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '3', 'protocol', 'tcp']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '3', 'destination', 'port', '22']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '3', 'limit', 'rate', '5/minute']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '3', 'log']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '4', 'action', 'drop']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '4', 'protocol', 'tcp']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '4', 'destination', 'port', '22']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '4', 'recent', 'count', '10']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '4', 'recent', 'time', 'minute']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '4', 'packet-type', 'host']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '5', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '5', 'protocol', 'tcp']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '5', 'tcp', 'flags', 'syn']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '5', 'tcp', 'mss', mss_range]) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '5', 'packet-type', 'broadcast']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '5', 'inbound-interface', 'name', interface_wc]) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '6', 'action', 'return']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '6', 'protocol', 'gre']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '6', 'connection-mark', conn_mark]) self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'default-action', 'drop']) self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'default-log']) self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '5', 'action', 'drop']) self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '5', 'protocol', 'gre']) self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '5', 'outbound-interface', 'name', interface_inv]) self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '6', 'action', 'return']) self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '6', 'protocol', 'icmp']) self.cli_set(['firewall', 'ipv4', 'output', 'filter', 'rule', '6', 'connection-mark', conn_mark]) self.cli_commit() mark_hex = "{0:#010x}".format(int(conn_mark)) nftables_search = [ ['chain VYOS_FORWARD_filter'], ['type filter hook forward priority filter; policy accept;'], ['tcp dport 22', 'limit rate 5/minute', 'accept'], ['tcp dport 22', 'add @RECENT_FWD_filter_4 { ip saddr limit rate over 10/minute burst 10 packets }', 'meta pkttype host', 'drop'], ['log prefix "[ipv4-FWD-filter-default-D]"','FWD-filter default-action drop', 'drop'], ['chain VYOS_INPUT_filter'], ['type filter hook input priority filter; policy accept;'], ['tcp flags & syn == syn', f'tcp option maxseg size {mss_range}', f'iifname "{interface_wc}"', 'meta pkttype broadcast', 'accept'], ['meta l4proto gre', f'ct mark {mark_hex}', 'return'], ['INP-filter default-action accept', 'accept'], ['chain VYOS_OUTPUT_filter'], ['type filter hook output priority filter; policy accept;'], ['meta l4proto gre', f'oifname != "{interface}"', 'drop'], ['meta l4proto icmp', f'ct mark {mark_hex}', 'return'], ['log prefix "[ipv4-OUT-filter-default-D]"','OUT-filter default-action drop', 'drop'], ['chain NAME_smoketest'], ['saddr 172.16.20.10', 'daddr 172.16.10.10', 'log prefix "[ipv4-NAM-smoketest-1-A]" log level debug', 'ip ttl 15', 'accept'], ['tcp flags syn / syn,ack', 'tcp dport 8888', 'log prefix "[ipv4-NAM-smoketest-2-R]" log level err', 'ip ttl > 102', 'reject'], ['log prefix "[ipv4-smoketest-default-D]"','smoketest default-action', 'drop'] ] self.verify_nftables(nftables_search, 'ip vyos_filter') def test_ipv4_advanced(self): name = 'smoketest-adv' name2 = 'smoketest-adv2' interface = 'eth0' self.cli_set(['firewall', 'ipv4', 'name', name, 'default-action', 'drop']) self.cli_set(['firewall', 'ipv4', 'name', name, 'default-log']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'packet-length', '64']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'packet-length', '512']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'packet-length', '1024']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'dscp', '17']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'dscp', '52']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'log']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'log-options', 'group', '66']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'log-options', 'snapshot-length', '6666']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '6', 'log-options', 'queue-threshold','32000']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '7', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '7', 'packet-length', '1-30000']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '7', 'packet-length-exclude', '60000-65535']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '7', 'dscp', '3-11']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '7', 'dscp-exclude', '21-25']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'default-action', 'drop']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'source', 'address', '198.51.100.1']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'mark', '1010']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'action', 'jump']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'jump-target', name]) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '2', 'protocol', 'tcp']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '2', 'mark', '!98765']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '2', 'action', 'queue']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '2', 'queue', '3']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '3', 'protocol', 'udp']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '3', 'action', 'queue']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '3', 'queue-options', 'fanout']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '3', 'queue-options', 'bypass']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '3', 'queue', '0-15']) self.cli_commit() nftables_search = [ ['chain VYOS_FORWARD_filter'], ['type filter hook forward priority filter; policy accept;'], ['ip saddr 198.51.100.1', 'meta mark 0x000003f2', f'jump NAME_{name}'], ['FWD-filter default-action drop', 'drop'], ['chain VYOS_INPUT_filter'], ['type filter hook input priority filter; policy accept;'], ['meta mark != 0x000181cd', 'meta l4proto tcp','queue to 3'], ['meta l4proto udp','queue flags bypass,fanout to 0-15'], ['INP-filter default-action accept', 'accept'], [f'chain NAME_{name}'], ['ip length { 64, 512, 1024 }', 'ip dscp { 0x11, 0x34 }', f'log prefix "[ipv4-NAM-{name}-6-A]" log group 66 snaplen 6666 queue-threshold 32000', 'accept'], ['ip length 1-30000', 'ip length != 60000-65535', 'ip dscp 0x03-0x0b', 'ip dscp != 0x15-0x19', 'accept'], [f'log prefix "[ipv4-{name}-default-D]"', 'drop'] ] self.verify_nftables(nftables_search, 'ip vyos_filter') def test_ipv4_synproxy(self): tcp_mss = '1460' tcp_wscale = '7' dport = '22' self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'action', 'drop']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'protocol', 'tcp']) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'destination', 'port', dport]) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'synproxy', 'tcp', 'mss', tcp_mss]) self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'synproxy', 'tcp', 'window-scale', tcp_wscale]) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'action', 'synproxy']) self.cli_commit() nftables_search = [ [f'tcp dport {dport} ct state invalid,untracked', f'synproxy mss {tcp_mss} wscale {tcp_wscale} timestamp sack-perm'] ] self.verify_nftables(nftables_search, 'ip vyos_filter') def test_ipv4_mask(self): name = 'smoketest-mask' interface = 'eth0' self.cli_set(['firewall', 'group', 'address-group', 'mask_group', 'address', '1.1.1.1']) self.cli_set(['firewall', 'ipv4', 'name', name, 'default-action', 'drop']) self.cli_set(['firewall', 'ipv4', 'name', name, 'default-log']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'action', 'drop']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'destination', 'address', '0.0.1.2']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'destination', 'address-mask', '0.0.255.255']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'source', 'address', '!0.0.3.4']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'source', 'address-mask', '0.0.255.255']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '3', 'action', 'drop']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '3', 'source', 'group', 'address-group', 'mask_group']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '3', 'source', 'address-mask', '0.0.255.255']) self.cli_commit() nftables_search = [ [f'daddr & 0.0.255.255 == 0.0.1.2'], [f'saddr & 0.0.255.255 != 0.0.3.4'], [f'saddr & 0.0.255.255 == @A_mask_group'] ] self.verify_nftables(nftables_search, 'ip vyos_filter') + def test_ipv4_dynamic_groups(self): + group01 = 'knock01' + group02 = 'allowed' + + self.cli_set(['firewall', 'group', 'dynamic-group', 'address-group', group01]) + self.cli_set(['firewall', 'group', 'dynamic-group', 'address-group', group02]) + + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'action', 'drop']) + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'protocol', 'tcp']) + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'destination', 'port', '5151']) + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'add-address-to-group', 'source-address', 'address-group', group01]) + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '10', 'add-address-to-group', 'source-address', 'timeout', '30s']) + + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '20', 'action', 'drop']) + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '20', 'protocol', 'tcp']) + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '20', 'destination', 'port', '7272']) + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '20', 'source', 'group', 'dynamic-address-group', group01]) + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '20', 'add-address-to-group', 'source-address', 'address-group', group02]) + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '20', 'add-address-to-group', 'source-address', 'timeout', '5m']) + + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '30', 'action', 'accept']) + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '30', 'protocol', 'tcp']) + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '30', 'destination', 'port', '22']) + self.cli_set(['firewall', 'ipv4', 'input', 'filter', 'rule', '30', 'source', 'group', 'dynamic-address-group', group02]) + + self.cli_commit() + + nftables_search = [ + [f'DA_{group01}'], + [f'DA_{group02}'], + ['type ipv4_addr'], + ['flags dynamic,timeout'], + ['chain VYOS_INPUT_filter {'], + ['type filter hook input priority filter', 'policy accept'], + ['tcp dport 5151', f'update @DA_{group01}', '{ ip saddr timeout 30s }', 'drop'], + ['tcp dport 7272', f'ip saddr @DA_{group01}', f'update @DA_{group02}', '{ ip saddr timeout 5m }', 'drop'], + ['tcp dport 22', f'ip saddr @DA_{group02}', 'accept'] + ] + + self.verify_nftables(nftables_search, 'ip vyos_filter') def test_ipv6_basic_rules(self): name = 'v6-smoketest' interface = 'eth0' self.cli_set(['firewall', 'global-options', 'state-policy', 'established', 'action', 'accept']) self.cli_set(['firewall', 'global-options', 'state-policy', 'related', 'action', 'accept']) self.cli_set(['firewall', 'global-options', 'state-policy', 'invalid', 'action', 'drop']) self.cli_set(['firewall', 'ipv6', 'name', name, 'default-action', 'drop']) self.cli_set(['firewall', 'ipv6', 'name', name, 'default-log']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '1', 'source', 'address', '2002::1']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '1', 'destination', 'address', '2002::1:1']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '1', 'log']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '1', 'log-options', 'level', 'crit']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'default-action', 'accept']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'default-log']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '2', 'action', 'reject']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '2', 'protocol', 'tcp_udp']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '2', 'destination', 'port', '8888']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '2', 'inbound-interface', 'name', interface]) self.cli_set(['firewall', 'ipv6', 'output', 'filter', 'default-action', 'drop']) self.cli_set(['firewall', 'ipv6', 'output', 'filter', 'default-log']) self.cli_set(['firewall', 'ipv6', 'output', 'filter', 'rule', '3', 'action', 'return']) self.cli_set(['firewall', 'ipv6', 'output', 'filter', 'rule', '3', 'protocol', 'gre']) self.cli_set(['firewall', 'ipv6', 'output', 'filter', 'rule', '3', 'outbound-interface', 'name', interface]) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '3', 'action', 'accept']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '3', 'protocol', 'udp']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '3', 'source', 'address', '2002::1:2']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '3', 'inbound-interface', 'name', interface]) self.cli_commit() nftables_search = [ ['chain VYOS_IPV6_FORWARD_filter'], ['type filter hook forward priority filter; policy accept;'], ['meta l4proto { tcp, udp }', 'th dport 8888', f'iifname "{interface}"', 'reject'], ['log prefix "[ipv6-FWD-filter-default-A]"','FWD-filter default-action accept', 'accept'], ['chain VYOS_IPV6_INPUT_filter'], ['type filter hook input priority filter; policy accept;'], ['meta l4proto udp', 'ip6 saddr 2002::1:2', f'iifname "{interface}"', 'accept'], ['INP-filter default-action accept', 'accept'], ['chain VYOS_IPV6_OUTPUT_filter'], ['type filter hook output priority filter; policy accept;'], ['meta l4proto gre', f'oifname "{interface}"', 'return'], ['log prefix "[ipv6-OUT-filter-default-D]"','OUT-filter default-action drop', 'drop'], [f'chain NAME6_{name}'], ['saddr 2002::1', 'daddr 2002::1:1', 'log prefix "[ipv6-NAM-v6-smoketest-1-A]" log level crit', 'accept'], [f'"{name} default-action drop"', f'log prefix "[ipv6-{name}-default-D]"', 'drop'], ['jump VYOS_STATE_POLICY6'], ['chain VYOS_STATE_POLICY6'], ['ct state established', 'accept'], ['ct state invalid', 'drop'], ['ct state related', 'accept'] ] self.verify_nftables(nftables_search, 'ip6 vyos_filter') def test_ipv6_advanced(self): name = 'v6-smoketest-adv' name2 = 'v6-smoketest-adv2' interface = 'eth0' self.cli_set(['firewall', 'ipv6', 'name', name, 'default-action', 'drop']) self.cli_set(['firewall', 'ipv6', 'name', name, 'default-log']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '3', 'action', 'accept']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '3', 'packet-length', '65']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '3', 'packet-length', '513']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '3', 'packet-length', '1025']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '3', 'dscp', '18']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '3', 'dscp', '53']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '4', 'action', 'accept']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '4', 'packet-length', '1-1999']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '4', 'packet-length-exclude', '60000-65535']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '4', 'dscp', '4-14']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '4', 'dscp-exclude', '31-35']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'default-action', 'accept']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '1', 'source', 'address', '2001:db8::/64']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '1', 'mark', '!6655-7766']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '1', 'action', 'jump']) self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '1', 'jump-target', name]) self.cli_commit() nftables_search = [ ['chain VYOS_IPV6_FORWARD_filter'], ['type filter hook forward priority filter; policy accept;'], ['ip6 length 1-1999', 'ip6 length != 60000-65535', 'ip6 dscp 0x04-0x0e', 'ip6 dscp != 0x1f-0x23', 'accept'], ['chain VYOS_IPV6_INPUT_filter'], ['type filter hook input priority filter; policy accept;'], ['ip6 saddr 2001:db8::/64', 'meta mark != 0x000019ff-0x00001e56', f'jump NAME6_{name}'], [f'chain NAME6_{name}'], ['ip6 length { 65, 513, 1025 }', 'ip6 dscp { af21, 0x35 }', 'accept'], [f'log prefix "[ipv6-{name}-default-D]"', 'drop'] ] self.verify_nftables(nftables_search, 'ip6 vyos_filter') def test_ipv6_mask(self): name = 'v6-smoketest-mask' interface = 'eth0' self.cli_set(['firewall', 'group', 'ipv6-address-group', 'mask_group', 'address', '::beef']) self.cli_set(['firewall', 'ipv6', 'name', name, 'default-action', 'drop']) self.cli_set(['firewall', 'ipv6', 'name', name, 'default-log']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '1', 'action', 'drop']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '1', 'destination', 'address', '::1111:2222:3333:4444']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '1', 'destination', 'address-mask', '::ffff:ffff:ffff:ffff']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '2', 'action', 'accept']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '2', 'source', 'address', '!::aaaa:bbbb:cccc:dddd']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '2', 'source', 'address-mask', '::ffff:ffff:ffff:ffff']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '3', 'action', 'drop']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '3', 'source', 'group', 'address-group', 'mask_group']) self.cli_set(['firewall', 'ipv6', 'name', name, 'rule', '3', 'source', 'address-mask', '::ffff:ffff:ffff:ffff']) self.cli_commit() nftables_search = [ ['daddr & ::ffff:ffff:ffff:ffff == ::1111:2222:3333:4444'], ['saddr & ::ffff:ffff:ffff:ffff != ::aaaa:bbbb:cccc:dddd'], ['saddr & ::ffff:ffff:ffff:ffff == @A6_mask_group'] ] self.verify_nftables(nftables_search, 'ip6 vyos_filter') + def test_ipv6_dynamic_groups(self): + group01 = 'knock01' + group02 = 'allowed' + + self.cli_set(['firewall', 'group', 'dynamic-group', 'ipv6-address-group', group01]) + self.cli_set(['firewall', 'group', 'dynamic-group', 'ipv6-address-group', group02]) + + self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '10', 'action', 'drop']) + self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '10', 'protocol', 'tcp']) + self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '10', 'destination', 'port', '5151']) + self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '10', 'add-address-to-group', 'source-address', 'address-group', group01]) + self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '10', 'add-address-to-group', 'source-address', 'timeout', '30s']) + + self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '20', 'action', 'drop']) + self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '20', 'protocol', 'tcp']) + self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '20', 'destination', 'port', '7272']) + self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '20', 'source', 'group', 'dynamic-address-group', group01]) + self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '20', 'add-address-to-group', 'source-address', 'address-group', group02]) + self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '20', 'add-address-to-group', 'source-address', 'timeout', '5m']) + + self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '30', 'action', 'accept']) + self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '30', 'protocol', 'tcp']) + self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '30', 'destination', 'port', '22']) + self.cli_set(['firewall', 'ipv6', 'input', 'filter', 'rule', '30', 'source', 'group', 'dynamic-address-group', group02]) + + self.cli_commit() + + nftables_search = [ + [f'DA6_{group01}'], + [f'DA6_{group02}'], + ['type ipv6_addr'], + ['flags dynamic,timeout'], + ['chain VYOS_IPV6_INPUT_filter {'], + ['type filter hook input priority filter', 'policy accept'], + ['tcp dport 5151', f'update @DA6_{group01}', '{ ip6 saddr timeout 30s }', 'drop'], + ['tcp dport 7272', f'ip6 saddr @DA6_{group01}', f'update @DA6_{group02}', '{ ip6 saddr timeout 5m }', 'drop'], + ['tcp dport 22', f'ip6 saddr @DA6_{group02}', 'accept'] + ] + + self.verify_nftables(nftables_search, 'ip6 vyos_filter') + def test_ipv4_state_and_status_rules(self): name = 'smoketest-state' interface = 'eth0' self.cli_set(['firewall', 'global-options', 'state-policy', 'established', 'action', 'accept']) self.cli_set(['firewall', 'global-options', 'state-policy', 'related', 'action', 'accept']) self.cli_set(['firewall', 'global-options', 'state-policy', 'invalid', 'action', 'drop']) self.cli_set(['firewall', 'ipv4', 'name', name, 'default-action', 'drop']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'state', 'established']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '1', 'state', 'related']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'action', 'reject']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '2', 'state', 'invalid']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '3', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '3', 'state', 'new']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '3', 'connection-status', 'nat', 'destination']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '4', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '4', 'state', 'new']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '4', 'state', 'established']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '4', 'connection-status', 'nat', 'source']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '5', 'action', 'accept']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '5', 'state', 'related']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '5', 'conntrack-helper', 'ftp']) self.cli_set(['firewall', 'ipv4', 'name', name, 'rule', '5', 'conntrack-helper', 'pptp']) self.cli_commit() nftables_search = [ ['ct state { established, related }', 'accept'], ['ct state invalid', 'reject'], ['ct state new', 'ct status dnat', 'accept'], ['ct state { established, new }', 'ct status snat', 'accept'], ['ct state related', 'ct helper { "ftp", "pptp" }', 'accept'], ['drop', f'comment "{name} default-action drop"'], ['jump VYOS_STATE_POLICY'], ['chain VYOS_STATE_POLICY'], ['ct state established', 'accept'], ['ct state invalid', 'drop'], ['ct state related', 'accept'] ] self.verify_nftables(nftables_search, 'ip vyos_filter') # Check conntrack self.verify_nftables_chain([['accept']], 'ip vyos_conntrack', 'FW_CONNTRACK') self.verify_nftables_chain([['return']], 'ip6 vyos_conntrack', 'FW_CONNTRACK') def test_bridge_basic_rules(self): name = 'smoketest' interface_in = 'eth0' mac_address = '00:53:00:00:00:01' vlan_id = '12' vlan_prior = '3' self.cli_set(['firewall', 'bridge', 'name', name, 'default-action', 'accept']) self.cli_set(['firewall', 'bridge', 'name', name, 'default-log']) self.cli_set(['firewall', 'bridge', 'name', name, 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'bridge', 'name', name, 'rule', '1', 'source', 'mac-address', mac_address]) self.cli_set(['firewall', 'bridge', 'name', name, 'rule', '1', 'inbound-interface', 'name', interface_in]) self.cli_set(['firewall', 'bridge', 'name', name, 'rule', '1', 'log']) self.cli_set(['firewall', 'bridge', 'name', name, 'rule', '1', 'log-options', 'level', 'crit']) self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'default-action', 'drop']) self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'default-log']) self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'rule', '1', 'action', 'accept']) self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'rule', '1', 'vlan', 'id', vlan_id]) self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'rule', '2', 'action', 'jump']) self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'rule', '2', 'jump-target', name]) self.cli_set(['firewall', 'bridge', 'forward', 'filter', 'rule', '2', 'vlan', 'priority', vlan_prior]) self.cli_commit() nftables_search = [ ['chain VYOS_FORWARD_filter'], ['type filter hook forward priority filter; policy accept;'], [f'vlan id {vlan_id}', 'accept'], [f'vlan pcp {vlan_prior}', f'jump NAME_{name}'], ['log prefix "[bri-FWD-filter-default-D]"', 'drop', 'FWD-filter default-action drop'], [f'chain NAME_{name}'], [f'ether saddr {mac_address}', f'iifname "{interface_in}"', f'log prefix "[bri-NAM-{name}-1-A]" log level crit', 'accept'], ['accept', f'{name} default-action accept'] ] self.verify_nftables(nftables_search, 'bridge vyos_filter') def test_source_validation(self): # Strict self.cli_set(['firewall', 'global-options', 'source-validation', 'strict']) self.cli_set(['firewall', 'global-options', 'ipv6-source-validation', 'strict']) self.cli_commit() nftables_strict_search = [ ['fib saddr . iif oif 0', 'drop'] ] self.verify_nftables_chain(nftables_strict_search, 'ip raw', 'vyos_global_rpfilter') self.verify_nftables_chain(nftables_strict_search, 'ip6 raw', 'vyos_global_rpfilter') # Loose self.cli_set(['firewall', 'global-options', 'source-validation', 'loose']) self.cli_set(['firewall', 'global-options', 'ipv6-source-validation', 'loose']) self.cli_commit() nftables_loose_search = [ ['fib saddr oif 0', 'drop'] ] self.verify_nftables_chain(nftables_loose_search, 'ip raw', 'vyos_global_rpfilter') self.verify_nftables_chain(nftables_loose_search, 'ip6 raw', 'vyos_global_rpfilter') def test_sysfs(self): for name, conf in sysfs_config.items(): paths = glob(conf['sysfs']) for path in paths: with open(path, 'r') as f: self.assertEqual(f.read().strip(), conf['default'], msg=path) self.cli_set(['firewall', 'global-options', name.replace("_", "-"), conf['test_value']]) self.cli_commit() for name, conf in sysfs_config.items(): paths = glob(conf['sysfs']) for path in paths: with open(path, 'r') as f: self.assertNotEqual(f.read().strip(), conf['default'], msg=path) ### Zone def test_zone_basic(self): self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'default-action', 'drop']) self.cli_set(['firewall', 'zone', 'smoketest-eth0', 'interface', 'eth0']) self.cli_set(['firewall', 'zone', 'smoketest-eth0', 'from', 'smoketest-local', 'firewall', 'name', 'smoketest']) self.cli_set(['firewall', 'zone', 'smoketest-local', 'local-zone']) self.cli_set(['firewall', 'zone', 'smoketest-local', 'from', 'smoketest-eth0', 'firewall', 'name', 'smoketest']) self.cli_set(['firewall', 'global-options', 'state-policy', 'established', 'action', 'accept']) self.cli_set(['firewall', 'global-options', 'state-policy', 'established', 'log']) self.cli_set(['firewall', 'global-options', 'state-policy', 'related', 'action', 'accept']) self.cli_set(['firewall', 'global-options', 'state-policy', 'invalid', 'action', 'drop']) self.cli_commit() nftables_search = [ ['chain VYOS_ZONE_FORWARD'], ['type filter hook forward priority filter + 1'], ['chain VYOS_ZONE_OUTPUT'], ['type filter hook output priority filter + 1'], ['chain VYOS_ZONE_LOCAL'], ['type filter hook input priority filter + 1'], ['chain VZONE_smoketest-eth0'], ['chain VZONE_smoketest-local_IN'], ['chain VZONE_smoketest-local_OUT'], ['oifname "eth0"', 'jump VZONE_smoketest-eth0'], ['jump VZONE_smoketest-local_IN'], ['jump VZONE_smoketest-local_OUT'], ['iifname "eth0"', 'jump NAME_smoketest'], ['oifname "eth0"', 'jump NAME_smoketest'], ['jump VYOS_STATE_POLICY'], ['chain VYOS_STATE_POLICY'], ['ct state established', 'log prefix "[STATE-POLICY-EST-A]"', 'accept'], ['ct state invalid', 'drop'], ['ct state related', 'accept'] ] nftables_output = cmd('sudo nft list table ip vyos_filter') for search in nftables_search: matched = False for line in nftables_output.split("\n"): if all(item in line for item in search): matched = True break self.assertTrue(matched) def test_flow_offload(self): self.cli_set(['firewall', 'flowtable', 'smoketest', 'interface', 'eth0']) self.cli_set(['firewall', 'flowtable', 'smoketest', 'offload', 'hardware']) # QEMU virtual NIC does not support hw-tc-offload with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(['firewall', 'flowtable', 'smoketest', 'offload', 'software']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'action', 'offload']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'offload-target', 'smoketest']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'protocol', 'tcp_udp']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'state', 'established']) self.cli_set(['firewall', 'ipv4', 'forward', 'filter', 'rule', '1', 'state', 'related']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '1', 'action', 'offload']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '1', 'offload-target', 'smoketest']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '1', 'protocol', 'tcp_udp']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '1', 'state', 'established']) self.cli_set(['firewall', 'ipv6', 'forward', 'filter', 'rule', '1', 'state', 'related']) self.cli_commit() nftables_search = [ ['flowtable VYOS_FLOWTABLE_smoketest'], ['hook ingress priority filter'], ['devices = { eth0 }'], ['ct state { established, related }', 'meta l4proto { tcp, udp }', 'flow add @VYOS_FLOWTABLE_smoketest'], ] self.verify_nftables(nftables_search, 'ip vyos_filter') self.verify_nftables(nftables_search, 'ip6 vyos_filter') # Check conntrack self.verify_nftables_chain([['accept']], 'ip vyos_conntrack', 'FW_CONNTRACK') self.verify_nftables_chain([['accept']], 'ip6 vyos_conntrack', 'FW_CONNTRACK') def test_zone_flow_offload(self): self.cli_set(['firewall', 'flowtable', 'smoketest', 'interface', 'eth0']) self.cli_set(['firewall', 'flowtable', 'smoketest', 'offload', 'hardware']) # QEMU virtual NIC does not support hw-tc-offload with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(['firewall', 'flowtable', 'smoketest', 'offload', 'software']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '1', 'action', 'offload']) self.cli_set(['firewall', 'ipv4', 'name', 'smoketest', 'rule', '1', 'offload-target', 'smoketest']) self.cli_set(['firewall', 'ipv6', 'name', 'smoketest', 'rule', '1', 'action', 'offload']) self.cli_set(['firewall', 'ipv6', 'name', 'smoketest', 'rule', '1', 'offload-target', 'smoketest']) self.cli_commit() nftables_search = [ ['chain NAME_smoketest'], ['flow add @VYOS_FLOWTABLE_smoketest'] ] self.verify_nftables(nftables_search, 'ip vyos_filter') nftables_search = [ ['chain NAME6_smoketest'], ['flow add @VYOS_FLOWTABLE_smoketest'] ] self.verify_nftables(nftables_search, 'ip6 vyos_filter') # Check conntrack self.verify_nftables_chain([['accept']], 'ip vyos_conntrack', 'FW_CONNTRACK') self.verify_nftables_chain([['accept']], 'ip6 vyos_conntrack', 'FW_CONNTRACK') if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/conf_mode/nat.py b/src/conf_mode/nat.py index 20570da62..19b206c59 100755 --- a/src/conf_mode/nat.py +++ b/src/conf_mode/nat.py @@ -1,240 +1,244 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import jmespath import json import os from sys import exit from netifaces import interfaces from vyos.base import Warning from vyos.config import Config from vyos.configdep import set_dependents, call_dependents from vyos.template import render from vyos.template import is_ip_network from vyos.utils.kernel import check_kmod from vyos.utils.dict import dict_search from vyos.utils.dict import dict_search_args from vyos.utils.process import cmd from vyos.utils.process import run from vyos.utils.network import is_addr_assigned from vyos import ConfigError from vyos import airbag airbag.enable() k_mod = ['nft_nat', 'nft_chain_nat'] nftables_nat_config = '/run/nftables_nat.conf' nftables_static_nat_conf = '/run/nftables_static-nat-rules.nft' valid_groups = [ 'address_group', 'domain_group', 'network_group', 'port_group' ] def get_config(config=None): if config: conf = config else: conf = Config() base = ['nat'] nat = conf.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True, with_recursive_defaults=True) set_dependents('conntrack', conf) if not conf.exists(base): nat['deleted'] = '' return nat nat['firewall_group'] = conf.get_config_dict(['firewall', 'group'], key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) + # Remove dynamic firewall groups if present: + if 'dynamic_group' in nat['firewall_group']: + del nat['firewall_group']['dynamic_group'] + return nat def verify_rule(config, err_msg, groups_dict): """ Common verify steps used for both source and destination NAT """ if (dict_search('translation.port', config) != None or dict_search('translation.redirect.port', config) != None or dict_search('destination.port', config) != None or dict_search('source.port', config)): if config['protocol'] not in ['tcp', 'udp', 'tcp_udp']: raise ConfigError(f'{err_msg} ports can only be specified when '\ 'protocol is either tcp, udp or tcp_udp!') if is_ip_network(dict_search('translation.address', config)): raise ConfigError(f'{err_msg} cannot use ports with an IPv4 network as '\ 'translation address as it statically maps a whole network '\ 'of addresses onto another network of addresses!') for side in ['destination', 'source']: if side in config: side_conf = config[side] if len({'address', 'fqdn'} & set(side_conf)) > 1: raise ConfigError('Only one of address, fqdn or geoip can be specified') if 'group' in side_conf: if len({'address_group', 'network_group', 'domain_group'} & set(side_conf['group'])) > 1: raise ConfigError('Only one address-group, network-group or domain-group can be specified') for group in valid_groups: if group in side_conf['group']: group_name = side_conf['group'][group] error_group = group.replace("_", "-") if group in ['address_group', 'network_group', 'domain_group']: types = [t for t in ['address', 'fqdn'] if t in side_conf] if types: raise ConfigError(f'{error_group} and {types[0]} cannot both be defined') if group_name and group_name[0] == '!': group_name = group_name[1:] group_obj = dict_search_args(groups_dict, group, group_name) if group_obj is None: raise ConfigError(f'Invalid {error_group} "{group_name}" on nat rule') if not group_obj: Warning(f'{error_group} "{group_name}" has no members!') if dict_search_args(side_conf, 'group', 'port_group'): if 'protocol' not in config: raise ConfigError('Protocol must be defined if specifying a port-group') if config['protocol'] not in ['tcp', 'udp', 'tcp_udp']: raise ConfigError('Protocol must be tcp, udp, or tcp_udp when specifying a port-group') if 'load_balance' in config: for item in ['source-port', 'destination-port']: if item in config['load_balance']['hash'] and config['protocol'] not in ['tcp', 'udp']: raise ConfigError('Protocol must be tcp or udp when specifying hash ports') count = 0 if 'backend' in config['load_balance']: for member in config['load_balance']['backend']: weight = config['load_balance']['backend'][member]['weight'] count = count + int(weight) if count != 100: Warning(f'Sum of weight for nat load balance rule is not 100. You may get unexpected behaviour') def verify(nat): if not nat or 'deleted' in nat: # no need to verify the CLI as NAT is going to be deactivated return None if dict_search('source.rule', nat): for rule, config in dict_search('source.rule', nat).items(): err_msg = f'Source NAT configuration error in rule {rule}:' if 'outbound_interface' in config: if 'name' in config['outbound_interface'] and 'group' in config['outbound_interface']: raise ConfigError(f'{err_msg} cannot specify both interface group and interface name for nat source rule "{rule}"') elif 'name' in config['outbound_interface']: if config['outbound_interface']['name'] not in 'any' and config['outbound_interface']['name'] not in interfaces(): Warning(f'NAT interface "{config["outbound_interface"]["name"]}" for source NAT rule "{rule}" does not exist!') if not dict_search('translation.address', config) and not dict_search('translation.port', config): if 'exclude' not in config and 'backend' not in config['load_balance']: raise ConfigError(f'{err_msg} translation requires address and/or port') addr = dict_search('translation.address', config) if addr != None and addr != 'masquerade' and not is_ip_network(addr): for ip in addr.split('-'): if not is_addr_assigned(ip): Warning(f'IP address {ip} does not exist on the system!') # common rule verification verify_rule(config, err_msg, nat['firewall_group']) if dict_search('destination.rule', nat): for rule, config in dict_search('destination.rule', nat).items(): err_msg = f'Destination NAT configuration error in rule {rule}:' if 'inbound_interface' in config: if 'name' in config['inbound_interface'] and 'group' in config['inbound_interface']: raise ConfigError(f'{err_msg} cannot specify both interface group and interface name for destination nat rule "{rule}"') elif 'name' in config['inbound_interface']: if config['inbound_interface']['name'] not in 'any' and config['inbound_interface']['name'] not in interfaces(): Warning(f'NAT interface "{config["inbound_interface"]["name"]}" for destination NAT rule "{rule}" does not exist!') if not dict_search('translation.address', config) and not dict_search('translation.port', config) and 'redirect' not in config['translation']: if 'exclude' not in config and 'backend' not in config['load_balance']: raise ConfigError(f'{err_msg} translation requires address and/or port') # common rule verification verify_rule(config, err_msg, nat['firewall_group']) if dict_search('static.rule', nat): for rule, config in dict_search('static.rule', nat).items(): err_msg = f'Static NAT configuration error in rule {rule}:' if 'inbound_interface' not in config: raise ConfigError(f'{err_msg} inbound-interface not specified') # common rule verification verify_rule(config, err_msg, nat['firewall_group']) return None def generate(nat): if not os.path.exists(nftables_nat_config): nat['first_install'] = True render(nftables_nat_config, 'firewall/nftables-nat.j2', nat) render(nftables_static_nat_conf, 'firewall/nftables-static-nat.j2', nat) # dry-run newly generated configuration tmp = run(f'nft -c -f {nftables_nat_config}') if tmp > 0: raise ConfigError('Configuration file errors encountered!') tmp = run(f'nft -c -f {nftables_static_nat_conf}') if tmp > 0: raise ConfigError('Configuration file errors encountered!') return None def apply(nat): cmd(f'nft -f {nftables_nat_config}') cmd(f'nft -f {nftables_static_nat_conf}') if not nat or 'deleted' in nat: os.unlink(nftables_nat_config) os.unlink(nftables_static_nat_conf) call_dependents() return None if __name__ == '__main__': try: check_kmod(k_mod) c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1) diff --git a/src/conf_mode/policy_route.py b/src/conf_mode/policy_route.py index adad012de..6d7a06714 100755 --- a/src/conf_mode/policy_route.py +++ b/src/conf_mode/policy_route.py @@ -1,195 +1,199 @@ #!/usr/bin/env python3 # # Copyright (C) 2021-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os from json import loads from sys import exit from vyos.base import Warning from vyos.config import Config from vyos.template import render from vyos.utils.dict import dict_search_args from vyos.utils.process import cmd from vyos.utils.process import run from vyos import ConfigError from vyos import airbag airbag.enable() mark_offset = 0x7FFFFFFF nftables_conf = '/run/nftables_policy.conf' valid_groups = [ 'address_group', 'domain_group', 'network_group', 'port_group', 'interface_group' ] def get_config(config=None): if config: conf = config else: conf = Config() base = ['policy'] policy = conf.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) policy['firewall_group'] = conf.get_config_dict(['firewall', 'group'], key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) + # Remove dynamic firewall groups if present: + if 'dynamic_group' in policy['firewall_group']: + del policy['firewall_group']['dynamic_group'] + return policy def verify_rule(policy, name, rule_conf, ipv6, rule_id): icmp = 'icmp' if not ipv6 else 'icmpv6' if icmp in rule_conf: icmp_defined = False if 'type_name' in rule_conf[icmp]: icmp_defined = True if 'code' in rule_conf[icmp] or 'type' in rule_conf[icmp]: raise ConfigError(f'{name} rule {rule_id}: Cannot use ICMP type/code with ICMP type-name') if 'code' in rule_conf[icmp]: icmp_defined = True if 'type' not in rule_conf[icmp]: raise ConfigError(f'{name} rule {rule_id}: ICMP code can only be defined if ICMP type is defined') if 'type' in rule_conf[icmp]: icmp_defined = True if icmp_defined and 'protocol' not in rule_conf or rule_conf['protocol'] != icmp: raise ConfigError(f'{name} rule {rule_id}: ICMP type/code or type-name can only be defined if protocol is ICMP') if 'set' in rule_conf: if 'tcp_mss' in rule_conf['set']: tcp_flags = dict_search_args(rule_conf, 'tcp', 'flags') if not tcp_flags or 'syn' not in tcp_flags: raise ConfigError(f'{name} rule {rule_id}: TCP SYN flag must be set to modify TCP-MSS') tcp_flags = dict_search_args(rule_conf, 'tcp', 'flags') if tcp_flags: if dict_search_args(rule_conf, 'protocol') != 'tcp': raise ConfigError('Protocol must be tcp when specifying tcp flags') not_flags = dict_search_args(rule_conf, 'tcp', 'flags', 'not') if not_flags: duplicates = [flag for flag in tcp_flags if flag in not_flags] if duplicates: raise ConfigError(f'Cannot match a tcp flag as set and not set') for side in ['destination', 'source']: if side in rule_conf: side_conf = rule_conf[side] if 'group' in side_conf: if len({'address_group', 'domain_group', 'network_group'} & set(side_conf['group'])) > 1: raise ConfigError('Only one address-group, domain-group or network-group can be specified') for group in valid_groups: if group in side_conf['group']: group_name = side_conf['group'][group] if group_name.startswith('!'): group_name = group_name[1:] fw_group = f'ipv6_{group}' if ipv6 and group in ['address_group', 'network_group'] else group error_group = fw_group.replace("_", "-") group_obj = dict_search_args(policy['firewall_group'], fw_group, group_name) if group_obj is None: raise ConfigError(f'Invalid {error_group} "{group_name}" on policy route rule') if not group_obj: Warning(f'{error_group} "{group_name}" has no members') if 'port' in side_conf or dict_search_args(side_conf, 'group', 'port_group'): if 'protocol' not in rule_conf: raise ConfigError('Protocol must be defined if specifying a port or port-group') if rule_conf['protocol'] not in ['tcp', 'udp', 'tcp_udp']: raise ConfigError('Protocol must be tcp, udp, or tcp_udp when specifying a port or port-group') def verify(policy): for route in ['route', 'route6']: ipv6 = route == 'route6' if route in policy: for name, pol_conf in policy[route].items(): if 'rule' in pol_conf: for rule_id, rule_conf in pol_conf['rule'].items(): verify_rule(policy, name, rule_conf, ipv6, rule_id) return None def generate(policy): if not os.path.exists(nftables_conf): policy['first_install'] = True render(nftables_conf, 'firewall/nftables-policy.j2', policy) return None def apply_table_marks(policy): for route in ['route', 'route6']: if route in policy: cmd_str = 'ip' if route == 'route' else 'ip -6' tables = [] for name, pol_conf in policy[route].items(): if 'rule' in pol_conf: for rule_id, rule_conf in pol_conf['rule'].items(): set_table = dict_search_args(rule_conf, 'set', 'table') if set_table: if set_table == 'main': set_table = '254' if set_table in tables: continue tables.append(set_table) table_mark = mark_offset - int(set_table) cmd(f'{cmd_str} rule add pref {set_table} fwmark {table_mark} table {set_table}') def cleanup_table_marks(): for cmd_str in ['ip', 'ip -6']: json_rules = cmd(f'{cmd_str} -j -N rule list') rules = loads(json_rules) for rule in rules: if 'fwmark' not in rule or 'table' not in rule: continue fwmark = rule['fwmark'] table = int(rule['table']) if fwmark[:2] == '0x': fwmark = int(fwmark, 16) if (int(fwmark) == (mark_offset - table)): cmd(f'{cmd_str} rule del fwmark {fwmark} table {table}') def apply(policy): install_result = run(f'nft -f {nftables_conf}') if install_result == 1: raise ConfigError('Failed to apply policy based routing') if 'first_install' not in policy: cleanup_table_marks() apply_table_marks(policy) return None if __name__ == '__main__': try: c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1) diff --git a/src/op_mode/firewall.py b/src/op_mode/firewall.py index 36bb013fe..4dcffc412 100755 --- a/src/op_mode/firewall.py +++ b/src/op_mode/firewall.py @@ -1,538 +1,559 @@ #!/usr/bin/env python3 # # Copyright (C) 2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import argparse import ipaddress import json import re import tabulate from vyos.config import Config from vyos.utils.process import cmd from vyos.utils.dict import dict_search_args def get_config_node(conf, node=None, family=None, hook=None, priority=None): if node == 'nat': if family == 'ipv6': config_path = ['nat66'] else: config_path = ['nat'] elif node == 'policy': config_path = ['policy'] else: config_path = ['firewall'] if family: config_path += [family] if hook: config_path += [hook] if priority: config_path += [priority] node_config = conf.get_config_dict(config_path, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) return node_config def get_nftables_details(family, hook, priority): if family == 'ipv6': suffix = 'ip6' name_prefix = 'NAME6_' aux='IPV6_' elif family == 'ipv4': suffix = 'ip' name_prefix = 'NAME_' aux='' else: suffix = 'bridge' name_prefix = 'NAME_' aux='' if hook == 'name' or hook == 'ipv6-name': command = f'sudo nft list chain {suffix} vyos_filter {name_prefix}{priority}' else: up_hook = hook.upper() command = f'sudo nft list chain {suffix} vyos_filter VYOS_{aux}{up_hook}_{priority}' try: results = cmd(command) except: return {} out = {} for line in results.split('\n'): comment_search = re.search(rf'{priority}[\- ](\d+|default-action)', line) if not comment_search: continue rule = {} rule_id = comment_search[1] counter_search = re.search(r'counter packets (\d+) bytes (\d+)', line) if counter_search: rule['packets'] = counter_search[1] rule['bytes'] = counter_search[2] rule['conditions'] = re.sub(r'(\b(counter packets \d+ bytes \d+|drop|reject|return|log)\b|comment "[\w\-]+")', '', line).strip() out[rule_id] = rule return out def output_firewall_name(family, hook, priority, firewall_conf, single_rule_id=None): print(f'\n---------------------------------\n{family} Firewall "{hook} {priority}"\n') details = get_nftables_details(family, hook, priority) rows = [] if 'rule' in firewall_conf: for rule_id, rule_conf in firewall_conf['rule'].items(): if single_rule_id and rule_id != single_rule_id: continue if 'disable' in rule_conf: continue row = [rule_id, rule_conf['action'], rule_conf['protocol'] if 'protocol' in rule_conf else 'all'] if rule_id in details: rule_details = details[rule_id] row.append(rule_details.get('packets', 0)) row.append(rule_details.get('bytes', 0)) row.append(rule_details['conditions']) rows.append(row) if hook in ['input', 'forward', 'output']: def_action = firewall_conf['default_action'] if 'default_action' in firewall_conf else 'accept' else: def_action = firewall_conf['default_action'] if 'default_action' in firewall_conf else 'drop' row = ['default', def_action, 'all'] rule_details = details['default-action'] row.append(rule_details.get('packets', 0)) row.append(rule_details.get('bytes', 0)) rows.append(row) if rows: header = ['Rule', 'Action', 'Protocol', 'Packets', 'Bytes', 'Conditions'] print(tabulate.tabulate(rows, header) + '\n') def output_firewall_name_statistics(family, hook, prior, prior_conf, single_rule_id=None): print(f'\n---------------------------------\n{family} Firewall "{hook} {prior}"\n') details = get_nftables_details(family, hook, prior) rows = [] if 'rule' in prior_conf: for rule_id, rule_conf in prior_conf['rule'].items(): if single_rule_id and rule_id != single_rule_id: continue if 'disable' in rule_conf: continue # Get source source_addr = dict_search_args(rule_conf, 'source', 'address') if not source_addr: source_addr = dict_search_args(rule_conf, 'source', 'group', 'address_group') if not source_addr: source_addr = dict_search_args(rule_conf, 'source', 'group', 'network_group') if not source_addr: source_addr = dict_search_args(rule_conf, 'source', 'group', 'domain_group') if not source_addr: source_addr = dict_search_args(rule_conf, 'source', 'fqdn') if not source_addr: source_addr = dict_search_args(rule_conf, 'source', 'geoip', 'country_code') if source_addr: source_addr = str(source_addr)[1:-1].replace('\'','') if 'inverse_match' in dict_search_args(rule_conf, 'source', 'geoip'): source_addr = 'NOT ' + str(source_addr) if not source_addr: source_addr = 'any' # Get destination dest_addr = dict_search_args(rule_conf, 'destination', 'address') if not dest_addr: dest_addr = dict_search_args(rule_conf, 'destination', 'group', 'address_group') if not dest_addr: dest_addr = dict_search_args(rule_conf, 'destination', 'group', 'network_group') if not dest_addr: dest_addr = dict_search_args(rule_conf, 'destination', 'group', 'domain_group') if not dest_addr: dest_addr = dict_search_args(rule_conf, 'destination', 'fqdn') if not dest_addr: dest_addr = dict_search_args(rule_conf, 'destination', 'geoip', 'country_code') if dest_addr: dest_addr = str(dest_addr)[1:-1].replace('\'','') if 'inverse_match' in dict_search_args(rule_conf, 'destination', 'geoip'): dest_addr = 'NOT ' + str(dest_addr) if not dest_addr: dest_addr = 'any' # Get inbound interface iiface = dict_search_args(rule_conf, 'inbound_interface', 'name') if not iiface: iiface = dict_search_args(rule_conf, 'inbound_interface', 'group') if not iiface: iiface = 'any' # Get outbound interface oiface = dict_search_args(rule_conf, 'outbound_interface', 'name') if not oiface: oiface = dict_search_args(rule_conf, 'outbound_interface', 'group') if not oiface: oiface = 'any' row = [rule_id] if rule_id in details: rule_details = details[rule_id] row.append(rule_details.get('packets', 0)) row.append(rule_details.get('bytes', 0)) else: row.append('0') row.append('0') row.append(rule_conf['action']) row.append(source_addr) row.append(dest_addr) row.append(iiface) row.append(oiface) rows.append(row) if hook in ['input', 'forward', 'output']: row = ['default'] rule_details = details['default-action'] row.append(rule_details.get('packets', 0)) row.append(rule_details.get('bytes', 0)) if 'default_action' in prior_conf: row.append(prior_conf['default_action']) else: row.append('accept') row.append('any') row.append('any') row.append('any') row.append('any') rows.append(row) elif 'default_action' in prior_conf and not single_rule_id: row = ['default'] if 'default-action' in details: rule_details = details['default-action'] row.append(rule_details.get('packets', 0)) row.append(rule_details.get('bytes', 0)) else: row.append('0') row.append('0') row.append(prior_conf['default_action']) row.append('any') # Source row.append('any') # Dest row.append('any') # inbound-interface row.append('any') # outbound-interface rows.append(row) if rows: header = ['Rule', 'Packets', 'Bytes', 'Action', 'Source', 'Destination', 'Inbound-Interface', 'Outbound-interface'] print(tabulate.tabulate(rows, header) + '\n') def show_firewall(): print('Rulesets Information') conf = Config() firewall = get_config_node(conf) if not firewall: return for family in ['ipv4', 'ipv6', 'bridge']: if family in firewall: for hook, hook_conf in firewall[family].items(): for prior, prior_conf in firewall[family][hook].items(): output_firewall_name(family, hook, prior, prior_conf) def show_firewall_family(family): print(f'Rulesets {family} Information') conf = Config() firewall = get_config_node(conf) if not firewall or family not in firewall: return for hook, hook_conf in firewall[family].items(): for prior, prior_conf in firewall[family][hook].items(): output_firewall_name(family, hook, prior, prior_conf) def show_firewall_name(family, hook, priority): print('Ruleset Information') conf = Config() firewall = get_config_node(conf, 'firewall', family, hook, priority) if firewall: output_firewall_name(family, hook, priority, firewall) def show_firewall_rule(family, hook, priority, rule_id): print('Rule Information') conf = Config() firewall = get_config_node(conf, 'firewall', family, hook, priority) if firewall: output_firewall_name(family, hook, priority, firewall, rule_id) def show_firewall_group(name=None): conf = Config() firewall = get_config_node(conf, node='firewall') if 'group' not in firewall: return nat = get_config_node(conf, node='nat') policy = get_config_node(conf, node='policy') def find_references(group_type, group_name): out = [] family = [] if group_type in ['address_group', 'network_group']: family = ['ipv4'] elif group_type == 'ipv6_address_group': family = ['ipv6'] group_type = 'address_group' elif group_type == 'ipv6_network_group': family = ['ipv6'] group_type = 'network_group' else: family = ['ipv4', 'ipv6', 'bridge'] for item in family: # Look references in firewall for name_type in ['name', 'ipv6_name', 'forward', 'input', 'output']: if item in firewall: if name_type not in firewall[item]: continue for priority, priority_conf in firewall[item][name_type].items(): if priority not in firewall[item][name_type]: continue if 'rule' not in priority_conf: continue for rule_id, rule_conf in priority_conf['rule'].items(): source_group = dict_search_args(rule_conf, 'source', 'group', group_type) dest_group = dict_search_args(rule_conf, 'destination', 'group', group_type) in_interface = dict_search_args(rule_conf, 'inbound_interface', 'group') out_interface = dict_search_args(rule_conf, 'outbound_interface', 'group') + dyn_group_source = dict_search_args(rule_conf, 'add_address_to_group', 'source_address', group_type) + dyn_group_dst = dict_search_args(rule_conf, 'add_address_to_group', 'destination_address', group_type) if source_group: if source_group[0] == "!": source_group = source_group[1:] if group_name == source_group: out.append(f'{item}-{name_type}-{priority}-{rule_id}') if dest_group: if dest_group[0] == "!": dest_group = dest_group[1:] if group_name == dest_group: out.append(f'{item}-{name_type}-{priority}-{rule_id}') if in_interface: if in_interface[0] == "!": in_interface = in_interface[1:] if group_name == in_interface: out.append(f'{item}-{name_type}-{priority}-{rule_id}') if out_interface: if out_interface[0] == "!": out_interface = out_interface[1:] if group_name == out_interface: out.append(f'{item}-{name_type}-{priority}-{rule_id}') + if dyn_group_source: + if group_name == dyn_group_source: + out.append(f'{item}-{name_type}-{priority}-{rule_id}') + if dyn_group_dst: + if group_name == dyn_group_dst: + out.append(f'{item}-{name_type}-{priority}-{rule_id}') + + # Look references in route | route6 for name_type in ['route', 'route6']: if name_type not in policy: continue if name_type == 'route' and item == 'ipv6': continue elif name_type == 'route6' and item == 'ipv4': continue else: for policy_name, policy_conf in policy[name_type].items(): if 'rule' not in policy_conf: continue for rule_id, rule_conf in policy_conf['rule'].items(): source_group = dict_search_args(rule_conf, 'source', 'group', group_type) dest_group = dict_search_args(rule_conf, 'destination', 'group', group_type) in_interface = dict_search_args(rule_conf, 'inbound_interface', 'group') out_interface = dict_search_args(rule_conf, 'outbound_interface', 'group') if source_group: if source_group[0] == "!": source_group = source_group[1:] if group_name == source_group: out.append(f'{name_type}-{policy_name}-{rule_id}') if dest_group: if dest_group[0] == "!": dest_group = dest_group[1:] if group_name == dest_group: out.append(f'{name_type}-{policy_name}-{rule_id}') if in_interface: if in_interface[0] == "!": in_interface = in_interface[1:] if group_name == in_interface: out.append(f'{name_type}-{policy_name}-{rule_id}') if out_interface: if out_interface[0] == "!": out_interface = out_interface[1:] if group_name == out_interface: out.append(f'{name_type}-{policy_name}-{rule_id}') ## Look references in nat table for direction in ['source', 'destination']: if direction in nat: if 'rule' not in nat[direction]: continue for rule_id, rule_conf in nat[direction]['rule'].items(): source_group = dict_search_args(rule_conf, 'source', 'group', group_type) dest_group = dict_search_args(rule_conf, 'destination', 'group', group_type) in_interface = dict_search_args(rule_conf, 'inbound_interface', 'group') out_interface = dict_search_args(rule_conf, 'outbound_interface', 'group') if source_group: if source_group[0] == "!": source_group = source_group[1:] if group_name == source_group: out.append(f'nat-{direction}-{rule_id}') if dest_group: if dest_group[0] == "!": dest_group = dest_group[1:] if group_name == dest_group: out.append(f'nat-{direction}-{rule_id}') if in_interface: if in_interface[0] == "!": in_interface = in_interface[1:] if group_name == in_interface: out.append(f'nat-{direction}-{rule_id}') if out_interface: if out_interface[0] == "!": out_interface = out_interface[1:] if group_name == out_interface: out.append(f'nat-{direction}-{rule_id}') return out header = ['Name', 'Type', 'References', 'Members'] rows = [] for group_type, group_type_conf in firewall['group'].items(): - for group_name, group_conf in group_type_conf.items(): - if name and name != group_name: - continue + ## + if group_type != 'dynamic_group': - references = find_references(group_type, group_name) - row = [group_name, group_type, '\n'.join(references) or 'N/D'] - if 'address' in group_conf: - row.append("\n".join(sorted(group_conf['address']))) - elif 'network' in group_conf: - row.append("\n".join(sorted(group_conf['network'], key=ipaddress.ip_network))) - elif 'mac_address' in group_conf: - row.append("\n".join(sorted(group_conf['mac_address']))) - elif 'port' in group_conf: - row.append("\n".join(sorted(group_conf['port']))) - elif 'interface' in group_conf: - row.append("\n".join(sorted(group_conf['interface']))) - else: - row.append('N/D') - rows.append(row) + for group_name, group_conf in group_type_conf.items(): + if name and name != group_name: + continue + references = find_references(group_type, group_name) + row = [group_name, group_type, '\n'.join(references) or 'N/D'] + if 'address' in group_conf: + row.append("\n".join(sorted(group_conf['address']))) + elif 'network' in group_conf: + row.append("\n".join(sorted(group_conf['network'], key=ipaddress.ip_network))) + elif 'mac_address' in group_conf: + row.append("\n".join(sorted(group_conf['mac_address']))) + elif 'port' in group_conf: + row.append("\n".join(sorted(group_conf['port']))) + elif 'interface' in group_conf: + row.append("\n".join(sorted(group_conf['interface']))) + else: + row.append('N/D') + rows.append(row) + + else: + for dynamic_type in ['address_group', 'ipv6_address_group']: + if dynamic_type in firewall['group']['dynamic_group']: + for dynamic_name, dynamic_conf in firewall['group']['dynamic_group'][dynamic_type].items(): + references = find_references(dynamic_type, dynamic_name) + row = [dynamic_name, dynamic_type + '(dynamic)', '\n'.join(references) or 'N/D'] + row.append('N/D') + rows.append(row) if rows: print('Firewall Groups\n') print(tabulate.tabulate(rows, header)) def show_summary(): print('Ruleset Summary') conf = Config() firewall = get_config_node(conf) if not firewall: return header = ['Ruleset Hook', 'Ruleset Priority', 'Description', 'References'] v4_out = [] v6_out = [] br_out = [] if 'ipv4' in firewall: for hook, hook_conf in firewall['ipv4'].items(): for prior, prior_conf in firewall['ipv4'][hook].items(): description = prior_conf.get('description', '') v4_out.append([hook, prior, description]) if 'ipv6' in firewall: for hook, hook_conf in firewall['ipv6'].items(): for prior, prior_conf in firewall['ipv6'][hook].items(): description = prior_conf.get('description', '') v6_out.append([hook, prior, description]) if 'bridge' in firewall: for hook, hook_conf in firewall['bridge'].items(): for prior, prior_conf in firewall['bridge'][hook].items(): description = prior_conf.get('description', '') br_out.append([hook, prior, description]) if v6_out: print('\nIPv6 Ruleset:\n') print(tabulate.tabulate(v6_out, header) + '\n') if v4_out: print('\nIPv4 Ruleset:\n') print(tabulate.tabulate(v4_out, header) + '\n') if br_out: print('\nBridge Ruleset:\n') print(tabulate.tabulate(br_out, header) + '\n') show_firewall_group() def show_statistics(): print('Rulesets Statistics') conf = Config() firewall = get_config_node(conf) if not firewall: return for family in ['ipv4', 'ipv6', 'bridge']: if family in firewall: for hook, hook_conf in firewall[family].items(): for prior, prior_conf in firewall[family][hook].items(): output_firewall_name_statistics(family, hook,prior, prior_conf) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--action', help='Action', required=False) parser.add_argument('--name', help='Firewall name', required=False, action='store', nargs='?', default='') parser.add_argument('--family', help='IP family', required=False, action='store', nargs='?', default='') parser.add_argument('--hook', help='Firewall hook', required=False, action='store', nargs='?', default='') parser.add_argument('--priority', help='Firewall priority', required=False, action='store', nargs='?', default='') parser.add_argument('--rule', help='Firewall Rule ID', required=False) parser.add_argument('--ipv6', help='IPv6 toggle', action='store_true') args = parser.parse_args() if args.action == 'show': if not args.rule: show_firewall_name(args.family, args.hook, args.priority) else: show_firewall_rule(args.family, args.hook, args.priority, args.rule) elif args.action == 'show_all': show_firewall() elif args.action == 'show_family': show_firewall_family(args.family) elif args.action == 'show_group': show_firewall_group(args.name) elif args.action == 'show_statistics': show_statistics() elif args.action == 'show_summary': show_summary()