diff --git a/changelogs/fragments/fix_issue170_vyos_firewall_rules.yaml b/changelogs/fragments/fix_issue170_vyos_firewall_rules.yaml new file mode 100644 index 0000000..aed026c --- /dev/null +++ b/changelogs/fragments/fix_issue170_vyos_firewall_rules.yaml @@ -0,0 +1,3 @@ +--- +bugfixes: + - fix issue in firewall rules facts code when IPV6 ICMP type name in vyos.vyos.vyos_firewall_rules is not idempotent diff --git a/plugins/module_utils/network/vyos/facts/firewall_rules/firewall_rules.py b/plugins/module_utils/network/vyos/facts/firewall_rules/firewall_rules.py index 4424292..63a159e 100644 --- a/plugins/module_utils/network/vyos/facts/firewall_rules/firewall_rules.py +++ b/plugins/module_utils/network/vyos/facts/firewall_rules/firewall_rules.py @@ -1,379 +1,384 @@ # # -*- coding: utf-8 -*- # Copyright 2019 Red Hat # GNU General Public License v3.0+ # (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) """ The vyos firewall_rules fact class It is in this file the configuration is collected from the device for a given resource, parsed, and the facts tree is populated based on the configuration. """ from __future__ import absolute_import, division, print_function __metaclass__ = type +import re from re import findall, search, M from copy import deepcopy from ansible_collections.ansible.netcommon.plugins.module_utils.network.common import ( utils, ) from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.argspec.firewall_rules.firewall_rules import ( Firewall_rulesArgs, ) class Firewall_rulesFacts(object): """The vyos firewall_rules fact class""" def __init__(self, module, subspec="config", options="options"): self._module = module self.argument_spec = Firewall_rulesArgs.argument_spec spec = deepcopy(self.argument_spec) if subspec: if options: facts_argument_spec = spec[subspec][options] else: facts_argument_spec = spec[subspec] else: facts_argument_spec = spec self.generated_spec = utils.generate_dict(facts_argument_spec) def get_device_data(self, connection): return connection.get_config() def populate_facts(self, connection, ansible_facts, data=None): """Populate the facts for firewall_rules :param connection: the device connection :param ansible_facts: Facts dictionary :param data: previously collected conf :rtype: dictionary :returns: facts """ if not data: # typically data is populated from the current device configuration # data = connection.get('show running-config | section ^interface') # using mock data instead data = self.get_device_data(connection) # split the config into instances of the resource objs = [] v6_rules = findall( r"^set firewall ipv6-name (?:\'*)(\S+)(?:\'*)", data, M ) v4_rules = findall(r"^set firewall name (?:\'*)(\S+)(?:\'*)", data, M) if v6_rules: config = self.get_rules(data, v6_rules, type="ipv6") if config: config = utils.remove_empties(config) objs.append(config) if v4_rules: config = self.get_rules(data, v4_rules, type="ipv4") if config: config = utils.remove_empties(config) objs.append(config) ansible_facts["ansible_network_resources"].pop("firewall_rules", None) facts = {} if objs: facts["firewall_rules"] = [] params = utils.validate_config( self.argument_spec, {"config": objs} ) for cfg in params["config"]: facts["firewall_rules"].append(utils.remove_empties(cfg)) ansible_facts["ansible_network_resources"].update(facts) return ansible_facts def get_rules(self, data, rules, type): """ This function performs following: - Form regex to fetch 'rule-sets' specific config from data. - Form the rule-set list based on ip address. :param data: configuration. :param rules: list of rule-sets. :param type: ip address type. :return: generated rule-sets configuration. """ r_v4 = [] r_v6 = [] for r in set(rules): rule_regex = r" %s .+$" % r.strip("'") cfg = findall(rule_regex, data, M) fr = self.render_config(cfg, r.strip("'")) fr["name"] = r.strip("'") if type == "ipv6": r_v6.append(fr) else: r_v4.append(fr) if r_v4: config = {"afi": "ipv4", "rule_sets": r_v4} if r_v6: config = {"afi": "ipv6", "rule_sets": r_v6} return config def render_config(self, conf, match): """ Render config as dictionary structure and delete keys from spec for null values :param spec: The facts tree, generated from the argspec :param conf: The configuration :rtype: dictionary :returns: The generated config """ conf = "\n".join(filter(lambda x: x, conf)) a_lst = ["description", "default_action", "enable_default_log"] config = self.parse_attr(conf, a_lst, match) if not config: config = {} config["rules"] = self.parse_rules_lst(conf) return config def parse_rules_lst(self, conf): """ This function forms the regex to fetch the 'rules' with in 'rule-sets' :param conf: configuration data. :return: generated rule list configuration. """ r_lst = [] rules = findall(r"rule (?:\'*)(\d+)(?:\'*)", conf, M) if rules: rules_lst = [] for r in set(rules): r_regex = r" %s .+$" % r cfg = "\n".join(findall(r_regex, conf, M)) obj = self.parse_rules(cfg) obj["number"] = int(r) if obj: rules_lst.append(obj) r_lst = sorted(rules_lst, key=lambda i: i["number"]) return r_lst def parse_rules(self, conf): """ This function triggers the parsing of 'rule' attributes. a_lst is a list having rule attributes which doesn't have further sub attributes. :param conf: configuration :return: generated rule configuration dictionary. """ a_lst = [ "ipsec", "action", "protocol", "fragment", "disabled", "description", + "icmp", ] rule = self.parse_attr(conf, a_lst) r_sub = { "p2p": self.parse_p2p(conf), "tcp": self.parse_tcp(conf, "tcp"), "icmp": self.parse_icmp(conf, "icmp"), "time": self.parse_time(conf, "time"), "limit": self.parse_limit(conf, "limit"), "state": self.parse_state(conf, "state"), "recent": self.parse_recent(conf, "recent"), "source": self.parse_src_or_dest(conf, "source"), "destination": self.parse_src_or_dest(conf, "destination"), } rule.update(r_sub) return rule def parse_p2p(self, conf): """ This function forms the regex to fetch the 'p2p' with in 'rules' :param conf: configuration data. :return: generated rule list configuration. """ a_lst = [] applications = findall(r"p2p (?:\'*)(\d+)(?:\'*)", conf, M) if applications: app_lst = [] for r in set(applications): obj = {"application": r.strip("'")} app_lst.append(obj) a_lst = sorted(app_lst, key=lambda i: i["application"]) return a_lst def parse_src_or_dest(self, conf, attrib=None): """ This function triggers the parsing of 'source or destination' attributes. :param conf: configuration. :param attrib:'source/destination'. :return:generated source/destination configuration dictionary. """ a_lst = ["port", "address", "mac_address"] cfg_dict = self.parse_attr(conf, a_lst, match=attrib) cfg_dict["group"] = self.parse_group(conf, attrib + " group") return cfg_dict def parse_recent(self, conf, attrib=None): """ This function triggers the parsing of 'recent' attributes :param conf: configuration. :param attrib: 'recent'. :return: generated config dictionary. """ a_lst = ["time", "count"] cfg_dict = self.parse_attr(conf, a_lst, match=attrib) return cfg_dict def parse_tcp(self, conf, attrib=None): """ This function triggers the parsing of 'tcp' attributes. :param conf: configuration. :param attrib: 'tcp'. :return: generated config dictionary. """ cfg_dict = self.parse_attr(conf, ["flags"], match=attrib) return cfg_dict def parse_time(self, conf, attrib=None): """ This function triggers the parsing of 'time' attributes. :param conf: configuration. :param attrib: 'time'. :return: generated config dictionary. """ a_lst = [ "stopdate", "stoptime", "weekdays", "monthdays", "startdate", "starttime", ] cfg_dict = self.parse_attr(conf, a_lst, match=attrib) return cfg_dict def parse_state(self, conf, attrib=None): """ This function triggers the parsing of 'state' attributes. :param conf: configuration :param attrib: 'state'. :return: generated config dictionary. """ a_lst = ["new", "invalid", "related", "established"] cfg_dict = self.parse_attr(conf, a_lst, match=attrib) return cfg_dict def parse_group(self, conf, attrib=None): """ This function triggers the parsing of 'group' attributes. :param conf: configuration. :param attrib: 'group'. :return: generated config dictionary. """ a_lst = ["port_group", "address_group", "network_group"] cfg_dict = self.parse_attr(conf, a_lst, match=attrib) return cfg_dict def parse_icmp(self, conf, attrib=None): """ This function triggers the parsing of 'icmp' attributes. :param conf: configuration to be parsed. :param attrib: 'icmp'. :return: generated config dictionary. """ a_lst = ["code", "type", "type_name"] + if attrib == "icmp": + attrib = "icmpv6" + conf = re.sub("icmpv6 type", "icmpv6 type-name", conf) cfg_dict = self.parse_attr(conf, a_lst, match=attrib) return cfg_dict def parse_limit(self, conf, attrib=None): """ This function triggers the parsing of 'limit' attributes. :param conf: configuration to be parsed. :param attrib: 'limit' :return: generated config dictionary. """ cfg_dict = self.parse_attr(conf, ["burst"], match=attrib) cfg_dict["rate"] = self.parse_rate(conf, "rate") return cfg_dict def parse_rate(self, conf, attrib=None): """ This function triggers the parsing of 'rate' attributes. :param conf: configuration. :param attrib: 'rate' :return: generated config dictionary. """ a_lst = ["unit", "number"] cfg_dict = self.parse_attr(conf, a_lst, match=attrib) return cfg_dict def parse_attr(self, conf, attr_list, match=None): """ This function peforms the following: - Form the regex to fetch the required attribute config. - Type cast the output in desired format. :param conf: configuration. :param attr_list: list of attributes. :param match: parent node/attribute name. :return: generated config dictionary. """ config = {} for attrib in attr_list: regex = self.map_regex(attrib) if match: regex = match + " " + regex if conf: if self.is_bool(attrib): out = conf.find(attrib.replace("_", "-")) dis = conf.find(attrib.replace("_", "-") + " 'disable'") if out >= 1: if dis >= 1: config[attrib] = False else: config[attrib] = True else: out = search(r"^.*" + regex + " (.+)", conf, M) if out: val = out.group(1).strip("'") if self.is_num(attrib): val = int(val) config[attrib] = val return config def map_regex(self, attrib): """ - This function construct the regex string. - replace the underscore with hyphen. :param attrib: attribute :return: regex string """ regex = attrib.replace("_", "-") if attrib == "disabled": regex = "disable" return regex def is_bool(self, attrib): """ This function looks for the attribute in predefined bool type set. :param attrib: attribute. :return: True/False """ bool_set = ( "new", "invalid", "related", "disabled", "established", "enable_default_log", ) return True if attrib in bool_set else False def is_num(self, attrib): """ This function looks for the attribute in predefined integer type set. :param attrib: attribute. :return: True/false. """ num_set = ("time", "code", "type", "count", "burst", "number") return True if attrib in num_set else False diff --git a/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config.cfg b/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config.cfg index f65b386..8726301 100644 --- a/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config.cfg +++ b/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config.cfg @@ -1,13 +1,15 @@ set firewall name V4-INGRESS default-action 'accept' set firewall ipv6-name V6-INGRESS default-action 'accept' set firewall name V4-INGRESS description 'This is IPv4 V4-INGRESS rule set' set firewall name V4-INGRESS enable-default-log set firewall name V4-INGRESS rule 101 protocol 'icmp' set firewall name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible' set firewall name V4-INGRESS rule 101 fragment 'match-frag' set firewall name V4-INGRESS rule 101 set firewall name V4-INGRESS rule 101 disabled set firewall name V4-INGRESS rule 101 action 'accept' set firewall name V4-INGRESS rule 101 ipsec 'match-ipsec' set firewall name V4-EGRESS default-action 'reject' set firewall ipv6-name V6-EGRESS default-action 'reject' +set firewall ipv6-name V6-EGRESS rule 20 +set firewall ipv6-name V6-EGRESS rule 20 icmpv6 type 'echo-request' \ No newline at end of file diff --git a/tests/unit/modules/network/vyos/test_vyos_firewall_rules.py b/tests/unit/modules/network/vyos/test_vyos_firewall_rules.py index 682b2da..520446e 100644 --- a/tests/unit/modules/network/vyos/test_vyos_firewall_rules.py +++ b/tests/unit/modules/network/vyos/test_vyos_firewall_rules.py @@ -1,1080 +1,1112 @@ # (c) 2016 Red Hat Inc. # # This file is part of Ansible # # Ansible is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Ansible is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Ansible. If not, see . # Make coding more python3-ish from __future__ import absolute_import, division, print_function __metaclass__ = type from ansible_collections.vyos.vyos.tests.unit.compat.mock import patch from ansible_collections.vyos.vyos.plugins.modules import vyos_firewall_rules from ansible_collections.vyos.vyos.tests.unit.modules.utils import ( set_module_args, ) from .vyos_module import TestVyosModule, load_fixture class TestVyosFirewallRulesModule(TestVyosModule): module = vyos_firewall_rules def setUp(self): super(TestVyosFirewallRulesModule, self).setUp() self.mock_get_config = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.get_config" ) self.get_config = self.mock_get_config.start() self.mock_load_config = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.load_config" ) self.load_config = self.mock_load_config.start() self.mock_get_resource_connection_config = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base.get_resource_connection" ) self.get_resource_connection_config = ( self.mock_get_resource_connection_config.start() ) self.mock_get_resource_connection_facts = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.facts.facts.get_resource_connection" ) self.get_resource_connection_facts = ( self.mock_get_resource_connection_facts.start() ) self.mock_execute_show_command = patch( "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.static_routes.static_routes.Static_routesFacts.get_device_data" ) self.mock_execute_show_command = patch( "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.firewall_rules.firewall_rules.Firewall_rulesFacts.get_device_data" ) self.execute_show_command = self.mock_execute_show_command.start() def tearDown(self): super(TestVyosFirewallRulesModule, self).tearDown() self.mock_get_resource_connection_config.stop() self.mock_get_resource_connection_facts.stop() self.mock_get_config.stop() self.mock_load_config.stop() self.mock_execute_show_command.stop() def load_fixtures(self, commands=None): def load_from_file(*args, **kwargs): return load_fixture("vyos_firewall_rules_config.cfg") self.execute_show_command.side_effect = load_from_file def test_vyos_firewall_rule_set_01_merged(self): set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="V6-INBOUND", description="This is IPv6 INBOUND rule set", default_action="reject", enable_default_log=True, rules=[], ), dict( name="V6-OUTBOUND", description="This is IPv6 OUTBOUND rule set", default_action="accept", enable_default_log=False, rules=[], ), ], ), dict( afi="ipv4", rule_sets=[ dict( name="V4-INBOUND", description="This is IPv4 INBOUND rule set", default_action="reject", enable_default_log=True, rules=[], ), dict( name="V4-OUTBOUND", description="This is IPv4 OUTBOUND rule set", default_action="accept", enable_default_log=False, rules=[], ), ], ), ], state="merged", ) ) commands = [ "set firewall ipv6-name V6-INBOUND default-action 'reject'", "set firewall ipv6-name V6-INBOUND description 'This is IPv6 INBOUND rule set'", "set firewall ipv6-name V6-INBOUND enable-default-log", "set firewall ipv6-name V6-OUTBOUND default-action 'accept'", "set firewall ipv6-name V6-OUTBOUND description 'This is IPv6 OUTBOUND rule set'", "set firewall name V4-INBOUND default-action 'reject'", "set firewall name V4-INBOUND description 'This is IPv4 INBOUND rule set'", "set firewall name V4-INBOUND enable-default-log", "set firewall name V4-OUTBOUND default-action 'accept'", "set firewall name V4-OUTBOUND description 'This is IPv4 OUTBOUND rule set'", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_rule_set_02_merged(self): set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="V6-INBOUND", description="This is IPv6 INBOUND rule set", default_action="reject", enable_default_log=True, rules=[], ), dict( name="V6-OUTBOUND", description="This is IPv6 OUTBOUND rule set", default_action="accept", enable_default_log=False, rules=[], ), ], ), dict( afi="ipv4", rule_sets=[ dict( name="V4-INBOUND", description="This is IPv4 INBOUND rule set", default_action="reject", enable_default_log=True, rules=[], ), dict( name="V4-OUTBOUND", description="This is IPv4 OUTBOUND rule set", default_action="accept", enable_default_log=False, rules=[], ), ], ), ], state="merged", ) ) commands = [ "set firewall ipv6-name V6-INBOUND default-action 'reject'", "set firewall ipv6-name V6-INBOUND description 'This is IPv6 INBOUND rule set'", "set firewall ipv6-name V6-INBOUND enable-default-log", "set firewall ipv6-name V6-OUTBOUND default-action 'accept'", "set firewall ipv6-name V6-OUTBOUND description 'This is IPv6 OUTBOUND rule set'", "set firewall name V4-INBOUND default-action 'reject'", "set firewall name V4-INBOUND description 'This is IPv4 INBOUND rule set'", "set firewall name V4-INBOUND enable-default-log", "set firewall name V4-OUTBOUND default-action 'accept'", "set firewall name V4-OUTBOUND description 'This is IPv4 OUTBOUND rule set'", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_rule_merged_01(self): set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="INBOUND", description="This is IPv4 INBOUND rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", protocol="icmp", fragment="match-frag", disabled=True, ) ], ), ], ) ], state="merged", ) ) commands = [ "set firewall name INBOUND default-action 'accept'", "set firewall name INBOUND description 'This is IPv4 INBOUND rule set'", "set firewall name INBOUND enable-default-log", "set firewall name INBOUND rule 101 protocol 'icmp'", "set firewall name INBOUND rule 101 description 'Rule 101 is configured by Ansible'", "set firewall name INBOUND rule 101 fragment 'match-frag'", "set firewall name INBOUND rule 101", "set firewall name INBOUND rule 101 disabled", "set firewall name INBOUND rule 101 action 'accept'", "set firewall name INBOUND rule 101 ipsec 'match-ipsec'", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_rule_merged_02(self): set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", protocol="tcp", source=dict( address="192.0.2.0", mac_address="38:00:25:19:76:0c", port=2127, ), destination=dict( address="192.0.1.0", port=2124 ), limit=dict( burst=10, rate=dict( number=20, unit="second" ), ), recent=dict(count=10, time=20), state=dict( established=True, related=True, invalid=True, new=True, ), ) ], ), ], ) ], state="merged", ) ) commands = [ "set firewall name INBOUND rule 101 protocol 'tcp'", "set firewall name INBOUND rule 101 destination address 192.0.1.0", "set firewall name INBOUND rule 101 destination port 2124", "set firewall name INBOUND rule 101", "set firewall name INBOUND rule 101 source address 192.0.2.0", "set firewall name INBOUND rule 101 source mac-address 38:00:25:19:76:0c", "set firewall name INBOUND rule 101 source port 2127", "set firewall name INBOUND rule 101 state new enable", "set firewall name INBOUND rule 101 state invalid enable", "set firewall name INBOUND rule 101 state related enable", "set firewall name INBOUND rule 101 state established enable", "set firewall name INBOUND rule 101 limit burst 10", "set firewall name INBOUND rule 101 limit rate 20/second", "set firewall name INBOUND rule 101 recent count 10", "set firewall name INBOUND rule 101 recent time 20", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_rule_merged_03(self): set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", destination=dict( group=dict( address_group="OUT-ADDR-GROUP", network_group="OUT-NET-GROUP", port_group="OUT-PORT-GROUP", ) ), source=dict( group=dict( address_group="IN-ADDR-GROUP", network_group="IN-NET-GROUP", port_group="IN-PORT-GROUP", ) ), ) ], ), ], ) ], state="merged", ) ) commands = [ "set firewall name INBOUND rule 101 source group address-group IN-ADDR-GROUP", "set firewall name INBOUND rule 101 source group network-group IN-NET-GROUP", "set firewall name INBOUND rule 101 source group port-group IN-PORT-GROUP", "set firewall name INBOUND rule 101 destination group address-group OUT-ADDR-GROUP", "set firewall name INBOUND rule 101 destination group network-group OUT-NET-GROUP", "set firewall name INBOUND rule 101 destination group port-group OUT-PORT-GROUP", "set firewall name INBOUND rule 101", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_rule_merged_04(self): set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", time=dict( monthdays="2", startdate="2020-01-24", starttime="13:20:00", stopdate="2020-01-28", stoptime="13:30:00", weekdays="!Sat,Sun", utc=True, ), tcp=dict(flags="ALL"), ) ], ), ], ) ], state="merged", ) ) commands = [ "set firewall name INBOUND rule 101", "set firewall name INBOUND rule 101 tcp flags ALL", "set firewall name INBOUND rule 101 time utc", "set firewall name INBOUND rule 101 time monthdays 2", "set firewall name INBOUND rule 101 time startdate 2020-01-24", "set firewall name INBOUND rule 101 time stopdate 2020-01-28", "set firewall name INBOUND rule 101 time weekdays !Sat,Sun", "set firewall name INBOUND rule 101 time stoptime 13:30:00", "set firewall name INBOUND rule 101 time starttime 13:20:00", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v6_rule_sets_rule_merged_01(self): set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", description="This is IPv6 INBOUND rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", protocol="icmp", disabled=True, + icmp=dict(type_name="echo-request"), ) ], ), ], ) ], state="merged", ) ) commands = [ "set firewall ipv6-name INBOUND default-action 'accept'", "set firewall ipv6-name INBOUND description 'This is IPv6 INBOUND rule set'", "set firewall ipv6-name INBOUND enable-default-log", "set firewall ipv6-name INBOUND rule 101 protocol 'icmp'", "set firewall ipv6-name INBOUND rule 101 description 'Rule 101 is configured by Ansible'", "set firewall ipv6-name INBOUND rule 101", "set firewall ipv6-name INBOUND rule 101 disabled", "set firewall ipv6-name INBOUND rule 101 action 'accept'", "set firewall ipv6-name INBOUND rule 101 ipsec 'match-ipsec'", + "set firewall ipv6-name INBOUND rule 101 icmpv6 type echo-request", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v6_rule_sets_rule_merged_02(self): set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", protocol="tcp", source=dict( address="2001:db8::12", mac_address="38:00:25:19:76:0c", port=2127, ), destination=dict( address="2001:db8::11", port=2124 ), limit=dict( burst=10, rate=dict( number=20, unit="second" ), ), recent=dict(count=10, time=20), state=dict( established=True, related=True, invalid=True, new=True, ), ) ], ), ], ) ], state="merged", ) ) commands = [ "set firewall ipv6-name INBOUND rule 101 protocol 'tcp'", "set firewall ipv6-name INBOUND rule 101 destination address 2001:db8::11", "set firewall ipv6-name INBOUND rule 101 destination port 2124", "set firewall ipv6-name INBOUND rule 101", "set firewall ipv6-name INBOUND rule 101 source address 2001:db8::12", "set firewall ipv6-name INBOUND rule 101 source mac-address 38:00:25:19:76:0c", "set firewall ipv6-name INBOUND rule 101 source port 2127", "set firewall ipv6-name INBOUND rule 101 state new enable", "set firewall ipv6-name INBOUND rule 101 state invalid enable", "set firewall ipv6-name INBOUND rule 101 state related enable", "set firewall ipv6-name INBOUND rule 101 state established enable", "set firewall ipv6-name INBOUND rule 101 limit burst 10", "set firewall ipv6-name INBOUND rule 101 recent count 10", "set firewall ipv6-name INBOUND rule 101 recent time 20", "set firewall ipv6-name INBOUND rule 101 limit rate 20/second", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v6_rule_sets_rule_merged_03(self): set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", destination=dict( group=dict( address_group="OUT-ADDR-GROUP", network_group="OUT-NET-GROUP", port_group="OUT-PORT-GROUP", ) ), source=dict( group=dict( address_group="IN-ADDR-GROUP", network_group="IN-NET-GROUP", port_group="IN-PORT-GROUP", ) ), ) ], ), ], ) ], state="merged", ) ) commands = [ "set firewall ipv6-name INBOUND rule 101 source group address-group IN-ADDR-GROUP", "set firewall ipv6-name INBOUND rule 101 source group network-group IN-NET-GROUP", "set firewall ipv6-name INBOUND rule 101 source group port-group IN-PORT-GROUP", "set firewall ipv6-name INBOUND rule 101 destination group address-group OUT-ADDR-GROUP", "set firewall ipv6-name INBOUND rule 101 destination group network-group OUT-NET-GROUP", "set firewall ipv6-name INBOUND rule 101 destination group port-group OUT-PORT-GROUP", "set firewall ipv6-name INBOUND rule 101", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v6_rule_sets_rule_merged_04(self): set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", time=dict( monthdays="2", startdate="2020-01-24", starttime="13:20:00", stopdate="2020-01-28", stoptime="13:30:00", weekdays="!Sat,Sun", utc=True, ), tcp=dict(flags="ALL"), ) ], ), ], ) ], state="merged", ) ) commands = [ "set firewall ipv6-name INBOUND rule 101", "set firewall ipv6-name INBOUND rule 101 tcp flags ALL", "set firewall ipv6-name INBOUND rule 101 time utc", "set firewall ipv6-name INBOUND rule 101 time monthdays 2", "set firewall ipv6-name INBOUND rule 101 time startdate 2020-01-24", "set firewall ipv6-name INBOUND rule 101 time stopdate 2020-01-28", "set firewall ipv6-name INBOUND rule 101 time weekdays !Sat,Sun", "set firewall ipv6-name INBOUND rule 101 time stoptime 13:30:00", "set firewall ipv6-name INBOUND rule 101 time starttime 13:20:00", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v6_rule_sets_rule_merged_icmp_01(self): set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", protocol="icmp", icmp=dict( type_name="port-unreachable" ), ) ], ), ], ) ], state="merged", ) ) commands = [ "set firewall ipv6-name INBOUND rule 101 icmpv6 type port-unreachable", "set firewall ipv6-name INBOUND rule 101 protocol 'icmp'", "set firewall ipv6-name INBOUND rule 101", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_rule_merged_icmp_01(self): set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", protocol="icmp", icmp=dict(type=1, code=1), ) ], ), ], ) ], state="merged", ) ) commands = [ "set firewall name INBOUND rule 101 icmp type 1", "set firewall name INBOUND rule 101 icmp code 1", "set firewall name INBOUND rule 101 protocol 'icmp'", "set firewall name INBOUND rule 101", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_rule_merged_icmp_02(self): set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", protocol="icmp", icmp=dict(type_name="echo-request"), ) ], ), ], ) ], state="merged", ) ) commands = [ "set firewall name INBOUND rule 101 icmp type-name echo-request", "set firewall name INBOUND rule 101 protocol 'icmp'", "set firewall name INBOUND rule 101", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_del_01(self): set_module_args( dict( config=[dict(afi="ipv4", rule_sets=[dict(name="V4-INGRESS")])], state="deleted", ) ) commands = ["delete firewall name V4-INGRESS"] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_del_02(self): set_module_args( dict( config=[ dict(afi="ipv4", rule_sets=[dict(name="V4-INGRESS")]), dict(afi="ipv6", rule_sets=[dict(name="V6-INGRESS")]), ], state="deleted", ) ) commands = [ "delete firewall name V4-INGRESS", "delete firewall ipv6-name V6-INGRESS", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_del_03(self): set_module_args(dict(config=[], state="deleted")) commands = ["delete firewall name", "delete firewall ipv6-name"] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_del_04(self): set_module_args( dict( config=[ dict(afi="ipv4", rule_sets=[dict(name="V4-ING")]), dict(afi="ipv6", rule_sets=[dict(name="V6-ING")]), ], state="deleted", ) ) self.execute_module(changed=False, commands=[]) def test_vyos_firewall_v4v6_rule_sets_rule_rep_01(self): set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="reject", description="Rule 101 is configured by Ansible RM", ipsec="match-ipsec", protocol="tcp", fragment="match-frag", disabled=False, ), dict( number="102", action="accept", description="Rule 102 is configured by Ansible RM", protocol="icmp", disabled=True, ), ], ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="V6-INGRESS", default_action="accept", description="This rule-set is configured by Ansible RM", ), dict( name="V6-EGRESS", default_action="reject", description="This rule-set is configured by Ansible RM", + rules=[ + dict( + icmp=dict(type_name="echo-request"), + number=20, + ) + ], ), ], ), ], state="replaced", ) ) commands = [ "delete firewall name V4-INGRESS rule 101 disabled", "delete firewall name V4-EGRESS default-action", "set firewall name V4-INGRESS description 'This is IPv4 INGRESS rule set'", "set firewall name V4-INGRESS rule 101 protocol 'tcp'", "set firewall name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible RM'", "set firewall name V4-INGRESS rule 101 action 'reject'", "set firewall name V4-INGRESS rule 102 disabled", "set firewall name V4-INGRESS rule 102 action 'accept'", "set firewall name V4-INGRESS rule 102 protocol 'icmp'", "set firewall name V4-INGRESS rule 102 description 'Rule 102 is configured by Ansible RM'", "set firewall name V4-INGRESS rule 102", "set firewall ipv6-name V6-INGRESS description 'This rule-set is configured by Ansible RM'", "set firewall ipv6-name V6-EGRESS description 'This rule-set is configured by Ansible RM'", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_rule_rep_02(self): set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 V4-INGRESS rule set", default_action="accept", enable_default_log=False, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", protocol="icmp", fragment="match-frag", disabled=True, ), ], ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="V6-INGRESS", default_action="accept", ), dict( name="V6-EGRESS", default_action="reject", + rules=[ + dict( + icmp=dict(type_name="echo-request"), + number=20, + ) + ], ), ], ), ], state="replaced", ) ) commands = [ "delete firewall name V4-INGRESS enable-default-log", "delete firewall name V4-EGRESS default-action", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_rule_rep_idem_01(self): set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 V4-INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", protocol="icmp", fragment="match-frag", disabled=True, ) ], ), dict( name="V4-EGRESS", default_action="reject", ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="V6-INGRESS", default_action="accept", ), dict( name="V6-EGRESS", default_action="reject", + rules=[ + dict( + icmp=dict(type_name="echo-request"), + number=20, + ) + ], ), ], ), ], state="replaced", ) ) self.execute_module(changed=False, commands=[]) def test_vyos_firewall_v4v6_rule_sets_rule_mer_idem_01(self): set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 V4-INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", protocol="icmp", fragment="match-frag", disabled=True, ) ], ), dict( name="V4-EGRESS", default_action="reject", ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="V6-INGRESS", default_action="accept", ), dict( name="V6-EGRESS", default_action="reject", + rules=[ + dict( + icmp=dict(type_name="echo-request"), + number=20, + ) + ], ), ], ), ], state="merged", ) ) self.execute_module(changed=False, commands=[]) def test_vyos_firewall_v4v6_rule_sets_rule_ovr_01(self): set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-IN", description="This is IPv4 INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="1", action="reject", description="Rule 1 is configured by Ansible RM", ipsec="match-ipsec", protocol="tcp", fragment="match-frag", disabled=False, source=dict( group=dict( address_group="IN-ADDR-GROUP", network_group="IN-NET-GROUP", port_group="IN-PORT-GROUP", ) ), ), dict( number="2", action="accept", description="Rule 102 is configured by Ansible RM", protocol="icmp", disabled=True, ), ], ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="V6-IN", default_action="accept", description="This rule-set is configured by Ansible RM", ), dict( name="V6-EG", default_action="reject", description="This rule-set is configured by Ansible RM", ), ], ), ], state="overridden", ) ) commands = [ "delete firewall ipv6-name V6-INGRESS", "delete firewall ipv6-name V6-EGRESS", "delete firewall name V4-INGRESS", "delete firewall name V4-EGRESS", "set firewall name V4-IN default-action 'accept'", "set firewall name V4-IN description 'This is IPv4 INGRESS rule set'", "set firewall name V4-IN enable-default-log", "set firewall name V4-IN rule 1 protocol 'tcp'", "set firewall name V4-IN rule 1 description 'Rule 1 is configured by Ansible RM'", "set firewall name V4-IN rule 1 fragment 'match-frag'", "set firewall name V4-IN rule 1 source group address-group IN-ADDR-GROUP", "set firewall name V4-IN rule 1 source group network-group IN-NET-GROUP", "set firewall name V4-IN rule 1 source group port-group IN-PORT-GROUP", "set firewall name V4-IN rule 1", "set firewall name V4-IN rule 1 action 'reject'", "set firewall name V4-IN rule 1 ipsec 'match-ipsec'", "set firewall name V4-IN rule 2 disabled", "set firewall name V4-IN rule 2 action 'accept'", "set firewall name V4-IN rule 2 protocol 'icmp'", "set firewall name V4-IN rule 2 description 'Rule 102 is configured by Ansible RM'", "set firewall name V4-IN rule 2", "set firewall ipv6-name V6-IN default-action 'accept'", "set firewall ipv6-name V6-IN description 'This rule-set is configured by Ansible RM'", "set firewall ipv6-name V6-EG default-action 'reject'", "set firewall ipv6-name V6-EG description 'This rule-set is configured by Ansible RM'", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_rule_ovr_idem_01(self): set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 V4-INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", protocol="icmp", fragment="match-frag", disabled=True, ) ], ), dict( name="V4-EGRESS", default_action="reject", ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="V6-INGRESS", default_action="accept", ), dict( name="V6-EGRESS", default_action="reject", + rules=[ + dict( + icmp=dict(type_name="echo-request"), + number=20, + ) + ], ), ], ), ], state="overridden", ) ) self.execute_module(changed=False, commands=[])