diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cab10556..d5617c09 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,56 +1,56 @@ --- ci: # format compatible with commitlint autoupdate_commit_msg: "chore: pre-commit autoupdate" autoupdate_schedule: monthly autofix_commit_msg: "chore: auto fixes from pre-commit.com hooks" repos: - repo: https://github.com/ansible-network/collection_prep rev: 1.1.1 hooks: - id: update-docs - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.6.0 + rev: v5.0.0 hooks: - id: check-merge-conflict - id: check-symlinks - id: debug-statements - id: end-of-file-fixer - id: no-commit-to-branch args: [--branch, main] - id: trailing-whitespace - repo: https://github.com/asottile/add-trailing-comma rev: v3.1.0 hooks: - id: add-trailing-comma - repo: https://github.com/pre-commit/mirrors-prettier rev: v4.0.0-alpha.8 hooks: - id: prettier entry: env CI=1 bash -c "prettier --list-different . || ec=$? && prettier --loglevel=error --write . && exit $ec" pass_filenames: false args: [] additional_dependencies: - prettier - prettier-plugin-toml - repo: https://github.com/PyCQA/isort - rev: 5.13.2 + rev: 6.0.0 hooks: - id: isort name: Sort import statements using isort args: [--filter-files] - repo: https://github.com/psf/black - rev: 24.4.2 + rev: 25.1.0 hooks: - id: black - repo: https://github.com/pycqa/flake8 - rev: 7.1.0 + rev: 7.1.2 hooks: - id: flake8 diff --git a/README.md b/README.md index 5d8e1e8b..6650cb67 100644 --- a/README.md +++ b/README.md @@ -1,261 +1,264 @@ # VyOS Collection [![codecov](https://codecov.io/gh/vyos/vyos.vyos/graph/badge.svg?token=J217GFD69W)](https://codecov.io/gh/vyos/vyos.vyos) [![CI](https://github.com/vyos/vyos.vyos/actions/workflows/tests.yml/badge.svg?branch=main&event=schedule)](https://github.com/vyos/vyos.vyos/actions/workflows/tests.yml) The Ansible VyOS collection includes a variety of Ansible content to help automate the management of VyOS network appliances. This collection has been tested against VyOS 1.3.8, 1.4.1 and the current rolling release for 1.5. Where possible, compatibility with older versions of VyOS are maintained but not guaranteed. ## Communication * Join the VyOS forum: * [FAQ](https://forum.vyos.io/faq): find answers to frequently asked questions. * [Guides and How To](https://forum.vyos.io/c/howto-guies/27): find guides and how-to articles. * [News & Announcements](https://forum.vyos.io/c/announcements/6): track project-wide announcements . ## Ansible version compatibility This collection has been tested against following Ansible versions: **>=2.15.0**. For collections that support Ansible 2.9, please ensure you update your `network_os` to use the fully qualified collection name (for example, `cisco.ios.ios`). Plugins and modules within a collection may be tested with only specific Ansible versions. A collection may contain metadata that identifies these versions. PEP440 is the schema used to describe the versions of Ansible. ### Supported connections The VyOS collection supports ``network_cli`` connections. ## Included content ### Cliconf plugins Name | Description --- | --- [vyos.vyos.vyos](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_cliconf.rst)|Use vyos cliconf to run command on VyOS platform ### Modules Name | Description --- | --- [vyos.vyos.vyos_banner](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_banner_module.rst)|Manage multiline banners on VyOS devices [vyos.vyos.vyos_bgp_address_family](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_bgp_address_family_module.rst)|BGP Address Family resource module [vyos.vyos.vyos_bgp_global](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_bgp_global_module.rst)|BGP global resource module [vyos.vyos.vyos_command](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_command_module.rst)|Run one or more commands on VyOS devices [vyos.vyos.vyos_config](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_config_module.rst)|Manage VyOS configuration on remote device [vyos.vyos.vyos_facts](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_facts_module.rst)|Get facts about vyos devices. [vyos.vyos.vyos_firewall_global](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_firewall_global_module.rst)|Firewall global resource module [vyos.vyos.vyos_firewall_interfaces](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_firewall_interfaces_module.rst)|Firewall interfaces resource module [vyos.vyos.vyos_firewall_rules](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_firewall_rules_module.rst)|Firewall rules resource module [vyos.vyos.vyos_hostname](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_hostname_module.rst)|Manages hostname resource module [vyos.vyos.vyos_interfaces](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_interfaces_module.rst)|Manages interface attributes of VyOS network devices. [vyos.vyos.vyos_l3_interfaces](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_l3_interfaces_module.rst)|Layer 3 interfaces resource module. [vyos.vyos.vyos_lag_interfaces](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_lag_interfaces_module.rst)|LAG interfaces resource module [vyos.vyos.vyos_lldp_global](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_lldp_global_module.rst)|LLDP global resource module [vyos.vyos.vyos_lldp_interfaces](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_lldp_interfaces_module.rst)|LLDP interfaces resource module [vyos.vyos.vyos_logging_global](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_logging_global_module.rst)|Logging resource module [vyos.vyos.vyos_ntp_global](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_ntp_global_module.rst)|NTP global resource module [vyos.vyos.vyos_ospf_interfaces](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_ospf_interfaces_module.rst)|OSPF Interfaces Resource Module. [vyos.vyos.vyos_ospfv2](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_ospfv2_module.rst)|OSPFv2 resource module [vyos.vyos.vyos_ospfv3](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_ospfv3_module.rst)|OSPFv3 resource module [vyos.vyos.vyos_ping](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_ping_module.rst)|Tests reachability using ping from VyOS network devices [vyos.vyos.vyos_prefix_lists](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_prefix_lists_module.rst)|Prefix-Lists resource module for VyOS [vyos.vyos.vyos_route_maps](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_route_maps_module.rst)|Route Map resource module [vyos.vyos.vyos_snmp_server](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_snmp_server_module.rst)|Manages snmp_server resource module [vyos.vyos.vyos_static_routes](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_static_routes_module.rst)|Static routes resource module [vyos.vyos.vyos_system](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_system_module.rst)|Run `set system` commands on VyOS devices [vyos.vyos.vyos_user](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_user_module.rst)|Manage the collection of local users on VyOS device [vyos.vyos.vyos_vlan](https://github.com/vyos/vyos.vyos/blob/main/docs/vyos.vyos.vyos_vlan_module.rst)|Manage VLANs on VyOS network devices Click the ``Content`` button to see the list of content included in this collection. ## Installing this collection You can install the VyOS collection with the Ansible Galaxy CLI: ansible-galaxy collection install vyos.vyos You can also include it in a `requirements.yml` file and install it with `ansible-galaxy collection install -r requirements.yml`, using the format: ```yaml --- collections: - name: vyos.vyos ``` ## Using this collection This collection includes [network resource modules](https://docs.ansible.com/ansible/latest/network/user_guide/network_resource_modules.html). ### Using modules from the VyOS collection in your playbooks You can call modules by their Fully Qualified Collection Namespace (FQCN), such as `vyos.vyos.vyos_static_routes`. The following example task replaces configuration changes in the existing configuration on a VyOS network device, using the FQCN: ```yaml --- - name: Replace device configurations of listed static routes with provided configurations register: result vyos.vyos.vyos_static_routes: &id001 config: - address_families: - afi: ipv4 routes: - dest: 192.0.2.32/28 blackhole_config: distance: 2 next_hops: - forward_router_address: 192.0.2.7 - forward_router_address: 192.0.2.8 - forward_router_address: 192.0.2.9 state: replaced ``` **NOTE**: For Ansible 2.9, you may not see deprecation warnings when you run your playbooks with this collection. Use this documentation to track when a module is deprecated. ### See Also: * [VyOS Platform Options](https://docs.ansible.com/ansible/latest/network/user_guide/platform_vyos.html) * [Ansible Using collections](https://docs.ansible.com/ansible/latest/user_guide/collections_using.html) for more details. ## Contributing to this collection We welcome community contributions to this collection. If you find problems, please open an issue or create a PR against the [VyOS collection repository](https://github.com/vyos/vyos.vyos). See [Contributing to VyOS](https://vyos.net/contribute/) for complete details. You can also join us on: - Forum - https://forum.vyos.io See the [Contributing to VyOS](https://vyos.net/contribute/) for details on contributing to Ansible. ### Code of Conduct This collection follows the Ansible project's [Code of Conduct](https://docs.ansible.com/ansible/devel/community/code_of_conduct.html). Please read and familiarize yourself with this document. ### Updating from resource module models Some of our modules were templated using `resource_module_builder`, but some use the newer [`cli_rm_builder`](https://github.com/ansible-network/cli_rm_builder) which tempaltes baed on in-place device information, but also uses a new network parsing engine designed to simplify and standardize the parsing of network configuration. #### Using older *resource_module_builder* modules Last build was with a slightly-modified version of resource_module_builder. This changes the calling parameters for the resources. To update the collection from the resource module models, run the following command: ```bash ansible-playbook -e rm_dest=`pwd` \ -e structure=collection \ -e collection_org=vyos \ -e collection_name=vyos \ -e model=../../../resource_module_models/models/vyos/firewall_rules/vyos_firewall_rules.yaml \ ../../../resource_module_builder/site.yml ``` #### Using *cli_rm_builder* modules The newer `cli_rm_builder` works similarly to the older `resource_module_builder`, but pulls the information directly from the `DOCUMENTATION`, `EXAMPLES` and `RETURN` blocks in the module itself. To update the collection from the `cli_rm_builder` models, run the following command: ```bash ansible-playbook -e rm_dest=`pwd` \ -e collection_org=vyos \ -e collection_name=vyos \ -e resource=bgp_address_family \ ../../../cli_rm_builder/run.yml ``` Unlike the `resource_module_builder`, the `cli_rm_builder` does not require the `model` parameter. Instead, it uses the `resource` parameter to specify the resource to build. ### Testing playbooks You can use `ANSIBLE_COLLECTIONS_PATH` to test the collection locally. For example: ``` ANSIBLE_COLLECTIONS_PATHS=~/my_dev_path ansible-playbook -i inventory.network test.yml ``` ### Integration Tests Integration tests are run using `ansible-test` and require that there be an inventory defined (you can pass this in with `--inventory `) and that the system be configured for access (recommended to use SSH keys). Additionally: - eth0 should be configured for `address dhcp` and should have an assigned address on the local network - eth1 and eth2 should be defined and uncofirgured (they'll be overwritten by the tests) - eth3 and beyond should not be present or interface-related tests will fail +- when using VMs for testing, ensure that the interfaces don't use `virtio`, as it will supress + some interface configurations. `e1000e` is a good choice for testing. - eth0 is also expected to show `duplex auto` and `speed auto` in the output of `show interfaces`, however others are not due to the fact that they are repeatedly deleted and recreated which causes the default values to be hidden. ## Changelogs - + +Change logs are available [here](https://github.com/vyos/vyos.vyos/blob/main/CHANGELOG.rst). ## Release notes Release notes are available [here](https://github.com/vyos/vyos.vyos/blob/main/CHANGELOG.rst). ## Roadmap Major Version | Ansible Support | VyOS Support | Details --- | --- | --- | --- 4.1.0 | 2.15 | 1.1.2 | Final release for the 4.x series 5.0.0 | 2.16 | 1.1.2 | First relase under VyOS control as a separate collection 6.0.0 | 2.18 | 1.3.8 | *Planned* release for supporting VyOS 1.3.8+ 7.0.0 | x.xx | 1.4.x | *Prospective* release deprecating incompatible 1.3.x modules Note: - Unreleased versions are not guaranteed to be released as described. - Some modules may support a wider variety of versions depending upon the compatibility with prior versions of VyOS. - The roadmap is subject to change based on community feedback and contributions. ## More information VyOS resources - [Contributing to VyOS](https://vyos.net/contribute) - [VyOS documentation](https://docs.vyos.io/en/latest/) - [VyOS forum](https://forum.vyos.io) Ansible Resources - [Ansible network resources](https://docs.ansible.com/ansible/latest/network/getting_started/network_resources.html) - [Ansible Collection overview](https://github.com/ansible-collections/overview) - [Ansible User guide](https://docs.ansible.com/ansible/latest/user_guide/index.html) - [Ansible Developer guide](https://docs.ansible.com/ansible/latest/dev_guide/index.html) - [Ansible Community code of conduct](https://docs.ansible.com/ansible/latest/community/code_of_conduct.html) ## Licensing GNU General Public License v3.0 or later. See [LICENSE](https://www.gnu.org/licenses/gpl-3.0.txt) to see the full text. diff --git a/changelogs/fragments/T7162-interface-preflight.yaml b/changelogs/fragments/T7162-interface-preflight.yaml new file mode 100644 index 00000000..c829ca8b --- /dev/null +++ b/changelogs/fragments/T7162-interface-preflight.yaml @@ -0,0 +1,5 @@ +--- +trivial: + - fix pre-flight sets for interfaces + - restore previously-removed interface test duplex and speed + - remove unnecessary debug statements in integration tests diff --git a/changelogs/fragments/T7236_firewall_rules.yml b/changelogs/fragments/T7236_firewall_rules.yml new file mode 100644 index 00000000..bad04832 --- /dev/null +++ b/changelogs/fragments/T7236_firewall_rules.yml @@ -0,0 +1,3 @@ +--- +minor_changes: + - vyos_firewall_rules - Fixed comparing of firewall rules diff --git a/changelogs/fragments/T7259-get_config.yaml b/changelogs/fragments/T7259-get_config.yaml new file mode 100644 index 00000000..84bb1ab4 --- /dev/null +++ b/changelogs/fragments/T7259-get_config.yaml @@ -0,0 +1,3 @@ +--- +minor_changes: + - vyos_config - block get_config call if match is set to "none" diff --git a/changelogs/fragments/T7260-remove-last-firewall-group-member.yaml b/changelogs/fragments/T7260-remove-last-firewall-group-member.yaml new file mode 100644 index 00000000..78e07356 --- /dev/null +++ b/changelogs/fragments/T7260-remove-last-firewall-group-member.yaml @@ -0,0 +1,3 @@ +--- +bugfixes: + - vyos_firewall_global - Fix removing last member of a firewall group. diff --git a/plugins/module_utils/network/vyos/config/firewall_global/firewall_global.py b/plugins/module_utils/network/vyos/config/firewall_global/firewall_global.py index e2a25e32..0d73d209 100644 --- a/plugins/module_utils/network/vyos/config/firewall_global/firewall_global.py +++ b/plugins/module_utils/network/vyos/config/firewall_global/firewall_global.py @@ -1,767 +1,793 @@ # # -*- coding: utf-8 -*- # Copyright 2019 Red Hat # GNU General Public License v3.0+ # (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) """ The vyos_firewall_global class It is in this file where the current configuration (as dict) is compared to the provided configuration (as dict) and the command set necessary to bring the current configuration to it's desired end-state is created """ from __future__ import absolute_import, division, print_function __metaclass__ = type from copy import deepcopy from ansible.module_utils.six import iteritems from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base import ( ConfigBase, ) from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import ( remove_empties, to_list, ) from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.facts import Facts from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.utils.utils import ( list_diff_want_only, in_target_not_none, ) - +from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.utils.version import ( + LooseVersion, +) from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.vyos import get_os_version -from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.utils.version import LooseVersion - class Firewall_global(ConfigBase): """ The vyos_firewall_global class """ gather_subset = ["!all", "!min"] gather_network_resources = ["firewall_global"] def __init__(self, module): super(Firewall_global, self).__init__(module) def get_firewall_global_facts(self, data=None): """Get the 'facts' (the current configuration) :rtype: A dictionary :returns: The current configuration as a dictionary """ facts, _warnings = Facts(self._module).get_facts( self.gather_subset, self.gather_network_resources, data=data, ) firewall_global_facts = facts["ansible_network_resources"].get("firewall_global") if not firewall_global_facts: return [] return firewall_global_facts def execute_module(self): """Execute the module :rtype: A dictionary :returns: The result from module execution """ result = {"changed": False} warnings = list() commands = list() if self.state in self.ACTION_STATES: existing_firewall_global_facts = self.get_firewall_global_facts() else: existing_firewall_global_facts = [] if self.state in self.ACTION_STATES or self.state == "rendered": commands.extend(self.set_config(existing_firewall_global_facts)) if commands and self.state in self.ACTION_STATES: if not self._module.check_mode: self._connection.edit_config(commands) result["changed"] = True if self.state in self.ACTION_STATES: result["commands"] = commands if self.state in self.ACTION_STATES or self.state == "gathered": changed_firewall_global_facts = self.get_firewall_global_facts() elif self.state == "rendered": result["rendered"] = commands elif self.state == "parsed": running_config = self._module.params["running_config"] if not running_config: self._module.fail_json( msg="value of running_config parameter must not be empty for state parsed", ) result["parsed"] = self.get_firewall_global_facts(data=running_config) else: changed_firewall_global_facts = [] if self.state in self.ACTION_STATES: result["before"] = existing_firewall_global_facts if result["changed"]: result["after"] = changed_firewall_global_facts elif self.state == "gathered": result["gathered"] = changed_firewall_global_facts result["warnings"] = warnings return result def set_config(self, existing_firewall_global_facts): """Collect the configuration from the args passed to the module, collect the current configuration (as a dict from facts) :rtype: A list :returns: the commands necessary to migrate the current configuration to the desired configuration """ want = self._module.params["config"] have = existing_firewall_global_facts resp = self.set_state(want, have) return to_list(resp) def set_state(self, w, h): """Select the appropriate function based on the state provided :param want: the desired configuration as a dictionary :param have: the current configuration as a dictionary :rtype: A list :returns: the commands necessary to migrate the current configuration to the desired configuration """ commands = [] if self.state in ("merged", "replaced", "rendered") and not w: self._module.fail_json( msg="value of config parameter must not be empty for state {0}".format(self.state), ) if self.state == "deleted": commands.extend(self._state_deleted(want=None, have=h)) elif w: if self.state == "merged" or self.state == "rendered": commands.extend(self._state_merged(w, h)) elif self.state == "replaced": commands.extend(self._state_replaced(w, h)) return commands def _state_replaced(self, w, h): """The command generator when state is replaced :rtype: A list :returns: the commands necessary to migrate the current configuration to the desired configuration """ commands = [] if h: commands.extend(self._state_deleted(h, w)) commands.extend(self._state_merged(w, h)) return commands def _state_merged(self, want, have): """The command generator when state is merged :rtype: A list :returns: the commands necessary to merge the provided into the current configuration """ commands = [] commands.extend(self._add_global_attr(want, have)) return commands def _state_deleted(self, want, have): """The command generator when state is deleted :rtype: A list :returns: the commands necessary to remove the current configuration of the provided objects """ commands = [] b_set = ( "config_trap", "validation", "log_martians", "syn_cookies", "twa_hazards_protection", ) if want: for key, val in iteritems(want): if val and key in b_set and not have: commands.append(self._form_attr_cmd(attr=key, opr=False)) elif val and key in b_set and have and key in have and have[key] != val: commands.append(self._form_attr_cmd(attr=key, opr=False)) else: commands.extend(self._render_attr_config(want, have, key)) elif not want and have: commands.append(self._compute_command(opr=False)) elif have: for key, val in iteritems(have): if val and key in b_set: commands.append(self._form_attr_cmd(attr=key, opr=False)) else: commands.extend(self._render_attr_config(want, have, key)) return commands def _render_attr_config(self, w, h, key, opr=False): """ This function invoke the function to extend commands based on the key. :param w: the desired configuration. :param h: the current configuration. :param key: attribute name :param opr: operation :return: list of commands """ commands = [] if key == "ping": commands.extend(self._render_ping(key, w, h, opr=opr)) elif key == "group": commands.extend(self._render_group(key, w, h, opr=opr)) elif key == "state_policy": commands.extend(self._render_state_policy(key, w, h, opr=opr)) elif key == "route_redirects": commands.extend(self._render_route_redirects(key, w, h, opr=opr)) return commands def _add_global_attr(self, w, h, opr=True): """ This function forms the set/delete commands based on the 'opr' type for firewall_global attributes. :param w: the desired config. :param h: the target config. :param opr: True/False. :return: generated commands list. """ commands = [] w_fg = deepcopy(remove_empties(w)) l_set = ( "config_trap", "validation", "log_martians", "syn_cookies", "twa_hazards_protection", ) if w_fg: for key, val in iteritems(w_fg): if opr and key in l_set and not (h and self._is_w_same(w_fg, h, key)): commands.append( self._form_attr_cmd(attr=key, val=self._bool_to_str(val), opr=opr), ) elif not opr: if key and self._is_del(l_set, h): commands.append( self._form_attr_cmd(attr=key, key=self._bool_to_str(val), opr=opr), ) continue - if ( - key in l_set - and not self._in_target(h, key) - and not self._is_del(l_set, h) - ): + if key in l_set and not self._in_target(h, key) and not self._is_del(l_set, h): commands.append( self._form_attr_cmd(attr=key, val=self._bool_to_str(val), opr=opr), ) else: commands.extend(self._render_attr_config(w_fg, h, key, opr)) return commands def _render_ping(self, attr, w, h, opr): """ This function forms the commands for 'ping' attributes based on the 'opr'. :param attr: attribute name. :param w: the desired configuration. :param h: the target config. :param opr: True/False. :return: generated list of commands. """ commands = [] h_ping = {} l_set = ("all", "broadcast") if h: h_ping = h.get(attr) or {} if self._is_root_del(w[attr], h_ping, attr): for item, value in iteritems(h[attr]): if not opr and item in l_set: commands.append(self._form_attr_cmd(attr=item, opr=opr)) elif w[attr]: if h and attr in h.keys(): h_ping = h.get(attr) or {} for item, value in iteritems(w[attr]): if ( opr and item in l_set and not (h_ping and self._is_w_same(w[attr], h_ping, item)) ): commands.append( self._form_attr_cmd(attr=item, val=self._bool_to_str(value), opr=opr), ) elif ( not opr and item in l_set and not (h_ping and self._is_w_same(w[attr], h_ping, item)) ): commands.append(self._form_attr_cmd(attr=item, opr=opr)) return commands def _render_group(self, attr, w, h, opr): """ This function forms the commands for 'group' attribute based on the 'opr'. :param attr: attribute name. :param w: base config. :param h: target config. :param opr: True/False. :return: generated list of commands. """ commands = [] h_grp = {} if not opr and self._is_root_del(h, w, attr): commands.append(self._form_attr_cmd(attr=attr, opr=opr)) else: if h: h_grp = h.get("group") or {} if w: commands.extend(self._render_grp_mem("port_group", w["group"], h_grp, opr)) commands.extend(self._render_grp_mem("address_group", w["group"], h_grp, opr)) commands.extend(self._render_grp_mem("network_group", w["group"], h_grp, opr)) return commands def _render_grp_mem(self, attr, w, h, opr): """ This function forms the commands for group list/members attributes based on the 'opr'. :param attr: attribute name. :param w: the desired config. :param h: the target config. :param opr: True/False. :return: generated list of commands. """ commands = [] h_grp = [] w_grp = [] l_set = ("name", "description") if w: w_grp = w.get(attr) or [] if h: h_grp = h.get(attr) or [] if w_grp: for want in w_grp: h = self.search_attrib_in_have(h_grp, want, "name") if "afi" in want and want["afi"] == "ipv6": cmd = self._compute_command(key="group", attr="ipv6-" + attr, opr=opr) else: cmd = self._compute_command(key="group", attr=attr, opr=opr) for key, val in iteritems(want): if val: if opr and key in l_set and not (h and self._is_w_same(want, h, key)): if key == "name": commands.append(cmd + " " + str(val)) else: commands.append( cmd + " " + want["name"] + " " + key + " '" + str(want[key]) + "'", ) elif not opr and key in l_set: if key == "name" and self._is_grp_del(h, want, key): + if commands[-1] == cmd + " " + want["name"] + " " + self._grp_type(attr): + commands.pop() commands.append(cmd + " " + want["name"]) continue if not (h and in_target_not_none(h, key)) and not self._is_grp_del(h, want, "name"): commands.append(cmd + " " + want["name"] + " " + key) elif key == "members": commands.extend( self._render_ports_addrs( key, want, h, opr, cmd, want["name"], attr, ), ) return commands def _render_ports_addrs(self, attr, w, h, opr, cmd, name, type): """ This function forms the commands for port/address/network group members based on the 'opr'. :param attr: attribute name. :param w: the desired config. :param h: the target config. :param cmd: commands to be prepend. :param name: name of group. :param type: group type. :return: generated list of commands. """ commands = [] have = [] if w: want = w.get(attr) or [] if h: have = h.get(attr) or [] if want: if opr: members = list_diff_want_only(want, have) for member in members: commands.append( cmd + " " + name + " " + self._grp_type(type) + " " + member[self._get_mem_type(type)], ) elif not opr and have: members = list_diff_want_only(want, have) for member in members: commands.append( cmd + " " + name + " " + self._grp_type(type) + " " + member[self._get_mem_type(type)], ) + elif not opr and not have: + commands.append( + cmd + + " " + + name + + " " + + self._grp_type(type), + ) return commands def _get_mem_type(self, group): """ This function returns the member type based on the type of group. """ return "port" if group == "port_group" else "address" def _render_state_policy(self, attr, w, h, opr): """ This function forms the commands for 'state-policy' attributes based on the 'opr'. :param attr: attribute name. :param w: the desired config. :param h: the target config. :param opr: True/False. :return: generated list of commands. """ commands = [] have = [] if LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"): l_set = ("log", "action", "connection_type", "log_level") else: l_set = ("log", "action", "connection_type") if not opr and self._is_root_del(h, w, attr): commands.append(self._form_attr_cmd(attr=attr, opr=opr)) else: w_sp = deepcopy(remove_empties(w)) want = w_sp.get(attr) or [] if h: have = h.get(attr) or [] if want: for w in want: h = self.search_attrib_in_have(have, w, "connection_type") for key, val in iteritems(w): if val and key != "connection_type": if opr and key in l_set and not (h and self._is_w_same(w, h, key)): - if key == "log" and LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"): + if key == "log" and LooseVersion( + get_os_version(self._module), + ) >= LooseVersion("1.4"): commands.append( self._form_attr_cmd( key=attr + " " + w["connection_type"], attr=key, opr=opr, ), ) else: commands.append( self._form_attr_cmd( key=attr + " " + w["connection_type"], attr=key, val=self._bool_to_str(val), opr=opr, ), ) elif not opr and key in l_set: if not h: commands.append( self._form_attr_cmd( attr=attr + " " + w["connection_type"], opr=opr, ), ) break # delete the whole thing and move on - if (not self._in_target(h, key) or h[key] is None) and (self._in_target(w, key) and w[key]): + if (not self._in_target(h, key) or h[key] is None) and ( + self._in_target(w, key) and w[key] + ): # delete if not being replaced and value currently exists commands.append( self._form_attr_cmd( attr=attr + " " + w["connection_type"] + " " + key, val=self._bool_to_str(val), opr=opr, ), ) return commands def _render_route_redirects(self, attr, w, h, opr): """ This function forms the commands for 'route_redirects' attributes based on the 'opr'. :param attr: attribute name. :param w: the desired config. :param h: the target config. :param opr: True/False. :return: generated list of commands. """ commands = [] have = [] l_set = ("afi", "ip_src_route") if w: want = w.get(attr) or [] if h: have = h.get(attr) or [] if want: for w in want: h = self.search_attrib_in_have(have, w, "afi") - if 'afi' in w: - afi = w['afi'] + if "afi" in w: + afi = w["afi"] else: - if h and 'afi' in h: - afi = h['afi'] + if h and "afi" in h: + afi = h["afi"] else: afi = None afi = None for key, val in iteritems(w): if val and key != "afi": if opr and key in l_set and not (h and self._is_w_same(w, h, key)): commands.append( self._form_attr_cmd( attr=key, val=self._bool_to_str(val), opr=opr, - type=afi + type=afi, ), ) elif not opr and key in l_set: if self._is_del(l_set, h): commands.append( self._form_attr_cmd( attr=key, val=self._bool_to_str(val), opr=opr, - type=afi + type=afi, ), ) continue if not (h and self._in_target(h, key)) and not self._is_del(l_set, h): commands.append( self._form_attr_cmd( attr=key, val=self._bool_to_str(val), opr=opr, - type=afi + type=afi, ), ) elif key == "icmp_redirects": commands.extend(self._render_icmp_redirects(key, w, h, opr)) return commands def _render_icmp_redirects(self, attr, w, h, opr): """ This function forms the commands for 'icmp_redirects' attributes based on the 'opr'. :param attr: attribute name. :param w: the desired config. :param h: the target config. :param opr: True/False. :return: generated list of commands. """ commands = [] h_red = {} l_set = ("send", "receive") - if w and 'afi' in w: - afi = w['afi'] + if w and "afi" in w: + afi = w["afi"] else: - if h and 'afi' in h: - afi = h['afi'] + if h and "afi" in h: + afi = h["afi"] else: afi = None if w[attr]: if h and attr in h.keys(): h_red = h.get(attr) or {} for item, value in iteritems(w[attr]): if opr and item in l_set and not (h_red and self._is_w_same(w[attr], h_red, item)): commands.append( - self._form_attr_cmd(attr=item, val=self._bool_to_str(value), opr=opr, type=afi) + self._form_attr_cmd( + attr=item, + val=self._bool_to_str(value), + opr=opr, + type=afi, + ), ) elif ( not opr and item in l_set and not (h_red and self._is_w_same(w[attr], h_red, item)) ): commands.append(self._form_attr_cmd(attr=item, opr=opr, type=afi)) return commands def search_attrib_in_have(self, have, want, attr): """ This function returns the attribute if it is present in target config. :param have: the target config. :param want: the desired config. :param attr: attribute name . :return: attribute/None """ if have: for h in have: if h[attr] == want[attr]: return h return None def _form_attr_cmd(self, key=None, attr=None, val=None, opr=True, type=None): """ This function forms the command for leaf attribute. :param key: parent key. :param attr: attribute name :param value: value :param opr: True/False. :param type: AF type of attribute. :return: generated command. """ - command = self._compute_command(key=key, attr=self._map_attrib(attr, type=type), val=val, opr=opr) + command = self._compute_command( + key=key, + attr=self._map_attrib(attr, type=type), + val=val, + opr=opr, + ) return command def _compute_command(self, key=None, attr=None, val=None, remove=False, opr=True): """ This function construct the add/delete command based on passed attributes. :param key: parent key. :param attr: attribute name :param value: value :param remove: True/False. :param opr: True/False. :return: generated command. """ if remove or not opr: cmd = "delete firewall " else: cmd = "set firewall " - if attr and key != "group" and LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"): + if ( + attr + and key != "group" + and LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4") + ): cmd += "global-options " if key: cmd += key.replace("_", "-") + " " if attr: cmd += attr.replace("_", "-") if val and opr: - if key == "state_policy" and LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"): + if key == "state_policy" and LooseVersion(get_os_version(self._module)) >= LooseVersion( + "1.4", + ): cmd += "" else: cmd += " '" + str(val) + "'" return cmd.strip() def _bool_to_str(self, val): """ This function converts the bool value into string. :param val: bool value. :return: enable/disable. """ return "enable" if str(val) == "True" else "disable" if str(val) == "False" else val def _grp_type(self, val): """ This function returns the group member type based on value argument. :param val: value. :return: member type. """ return ( "address" if val == "address_group" else "network" if val == "network_group" else "port" ) def _is_w_same(self, w, h, key): """ This function checks whether the key value is same in desired and target config dictionary. :param w: base config. :param h: target config. :param key:attribute name. :return: True/False. """ return True if h and key in h and h[key] == w[key] else False def _in_target(self, h, key): """ This function checks whether the target exist and key present in target config. :param h: target config. :param key: attribute name. :return: True/False. """ return True if h and key in h else False def _is_grp_del(self, w, h, key): """ This function checks whether group needed to be deleted based on desired and target configs. :param w: the desired config. :param h: the target config. :param key: group name. :return: True/False. """ return True if h and key in h and (not w or key not in w or not w[key]) else False def _is_root_del(self, w, h, key): """ This function checks whether a root attribute which can have further child attributes needed to be deleted. :param w: the desired config. :param h: the target config. :param key: attribute name. :return: True/False. """ return True if h and key in h and (not w or key not in w or not w[key]) else False def _is_del(self, b_set, h, key="number"): """ This function checks whether attribute needs to be deleted when operation is false and attribute present in present target config. :param b_set: attribute set. :param h: target config. :param key: number. :return: True/False. """ return key in b_set and not self._in_target(h, key) def _map_attrib(self, attrib, type=None): """ - This function construct the regex string. - replace the underscore with hyphen. :param attrib: attribute :return: regex string """ regex = attrib.replace("_", "-") if attrib == "send": if type == "ipv6": regex = "ipv6-send-redirects" else: regex = "send-redirects" elif attrib == "ip_src_route": if type == "ipv6": regex = "ipv6-src-route" elif attrib == "receive": if type == "ipv6": regex = "ipv6-receive-redirects" else: regex = "receive-redirects" elif attrib == "disabled": regex = "disable" elif attrib == "all": regex = "all-ping" elif attrib == "broadcast": regex = "broadcast-ping" elif attrib == "validation": regex = "source-validation" return regex diff --git a/plugins/module_utils/network/vyos/config/firewall_rules/firewall_rules.py b/plugins/module_utils/network/vyos/config/firewall_rules/firewall_rules.py index 5c2ef6ca..bb6055b7 100644 --- a/plugins/module_utils/network/vyos/config/firewall_rules/firewall_rules.py +++ b/plugins/module_utils/network/vyos/config/firewall_rules/firewall_rules.py @@ -1,1179 +1,1185 @@ # # -*- coding: utf-8 -*- # Copyright 2019 Red Hat # GNU General Public License v3.0+ # (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) """ The vyos_firewall_rules class It is in this file where the current configuration (as dict) is compared to the provided configuration (as dict) and the command set necessary to bring the current configuration to it's desired end-state is created """ from __future__ import absolute_import, division, print_function __metaclass__ = type from copy import deepcopy from ansible.module_utils.six import iteritems from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base import ( ConfigBase, ) from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import ( remove_empties, to_list, ) from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.facts import Facts from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.utils.utils import ( list_diff_want_only, ) from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.utils.version import ( LooseVersion, ) from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.vyos import ( get_os_version, load_config, ) class Firewall_rules(ConfigBase): """ The vyos_firewall_rules class """ gather_subset = [ "!all", "!min", ] gather_network_resources = [ "firewall_rules", ] def __init__(self, module): super(Firewall_rules, self).__init__(module) def get_firewall_rules_facts(self, data=None): """Get the 'facts' (the current configuration) :rtype: A dictionary :returns: The current configuration as a dictionary """ facts, _warnings = Facts(self._module).get_facts( self.gather_subset, self.gather_network_resources, data=data, ) firewall_rules_facts = facts["ansible_network_resources"].get("firewall_rules") if not firewall_rules_facts: return [] return firewall_rules_facts def execute_module(self): """Execute the module :rtype: A dictionary :returns: The result from module execution """ result = {"changed": False} warnings = list() commands = list() diff = None try: self._module.params["comment"] except KeyError: comment = [] else: comment = self._module.params["comment"] if self.state in self.ACTION_STATES: existing_firewall_rules_facts = self.get_firewall_rules_facts() else: existing_firewall_rules_facts = [] if self.state in self.ACTION_STATES or self.state == "rendered": commands.extend(self.set_config(deepcopy(existing_firewall_rules_facts))) if commands and self._module._diff: commit = not self._module.check_mode diff = load_config(self._module, commands, commit=commit, comment=comment) if diff: result["diff"] = {"prepared": str(diff)} if commands and self.state in self.ACTION_STATES: if not self._module.check_mode: self._connection.edit_config(commands) result["changed"] = True if self.state in self.ACTION_STATES: result["commands"] = commands if self.state in self.ACTION_STATES or self.state == "gathered": changed_firewall_rules_facts = self.get_firewall_rules_facts() elif self.state == "rendered": result["rendered"] = commands elif self.state == "parsed": running_config = self._module.params["running_config"] if not running_config: self._module.fail_json( msg="value of running_config parameter must not be empty for state parsed", ) result["parsed"] = self.get_firewall_rules_facts(data=running_config) else: changed_firewall_rules_facts = [] if self.state in self.ACTION_STATES: result["before"] = existing_firewall_rules_facts if result["changed"]: result["after"] = changed_firewall_rules_facts elif self.state == "gathered": result["gathered"] = changed_firewall_rules_facts result["warnings"] = warnings return result def set_config(self, existing_firewall_rules_facts): """Collect the configuration from the args passed to the module, collect the current configuration (as a dict from facts) :rtype: A list :returns: the commands necessary to migrate the current configuration to the desired configuration """ want = self._module.params["config"] self._prune_stubs(want) have = existing_firewall_rules_facts resp = self.set_state(want, have) return to_list(resp) def set_state(self, w, h): """Select the appropriate function based on the state provided :param want: the desired configuration as a dictionary :param have: the current configuration as a dictionary :rtype: A list :returns: the commands necessary to migrate the current configuration to the desired configuration """ commands = [] if self.state in ("merged", "replaced", "overridden", "rendered") and not w: self._module.fail_json( msg="value of config parameter must not be empty for state {0}".format(self.state), ) if self.state == "overridden": commands.extend(self._state_overridden(w, h)) elif self.state == "deleted": commands.extend(self._state_deleted(w, h)) elif w: if self.state == "merged" or self.state == "rendered": commands.extend(self._state_merged(w, h)) elif self.state == "replaced": commands.extend(self._state_replaced(w, h)) return commands def _state_replaced(self, want, have): """The command generator when state is replaced :rtype: A list :returns: the commands necessary to migrate the current configuration to the desired configuration """ commands = [] if have: # Iterate over the afi rule sets we already have. for h in have: r_sets = self._get_r_sets(h) # Iterate over each rule set we already have. for rs in r_sets: # In the desired configuration, search for the rule set we # already have (to be replaced by our desired # configuration's rule set). rs_id = self._rs_id(rs, h["afi"]) wanted_rule_set = self.search_r_sets_in_have(want, rs_id, "r_list") if self._is_same_rs(remove_empties(wanted_rule_set), remove_empties(rs)): continue if wanted_rule_set is not None: # Remove the rules that we already have if the wanted # rules exist under the same name. commands.extend( self._add_r_sets( h["afi"], want=rs, have=wanted_rule_set, opr=False, ), ) # Merge the desired configuration into what we already have. commands.extend(self._state_merged(want, have)) return commands def _state_overridden(self, want, have): """The command generator when state is overridden :rtype: A list :returns: the commands necessary to migrate the current configuration to the desired configuration """ commands = [] if have: for h in have: have_r_sets = self._get_r_sets(h) for rs in have_r_sets: rs_id = self._rs_id(rs, h["afi"]) w = self.search_r_sets_in_have(want, rs_id, "r_list") if self._is_same_rs(remove_empties(w), remove_empties(rs)): continue else: commands.append(self._compute_command(rs_id, remove=True)) # Blank out the only rule set that it is removed. for entry in have: if entry["afi"] == rs_id["afi"] and rs_id["name"]: entry["rule_sets"] = [ rule_set for rule_set in entry["rule_sets"] if rule_set.get("name") != rs_id["name"] ] elif entry["afi"] == rs_id["afi"] and rs_id["filter"]: entry["rule_sets"] = [ rule_set for rule_set in entry["rule_sets"] if rule_set.get("filter") != rs_id["filter"] ] commands.extend(self._state_merged(want, have)) return commands def _state_merged(self, want, have): """The command generator when state is merged :rtype: A list :returns: the commands necessary to merge the provided into the current configuration """ commands = [] for w in want: r_sets = self._get_r_sets(w) for rs in r_sets: rs_id = self._rs_id(rs, w["afi"]) h = self.search_r_sets_in_have(have, rs_id, "r_list") if self._is_same_rs(remove_empties(h), remove_empties(rs)): continue else: commands.extend(self._add_r_sets(w["afi"], rs, h)) return commands def _state_deleted(self, want, have): """The command generator when state is deleted :rtype: A list :returns: the commands necessary to remove the current configuration of the provided objects """ commands = [] if want: for w in want: r_sets = self._get_r_sets(w) if r_sets: for rs in r_sets: rs_id = self._rs_id(rs, w["afi"]) h = self.search_r_sets_in_have(have, rs_id, "r_list") if h: commands.append(self._compute_command(rs_id, remove=True)) elif have: for h in have: if h["afi"] == w["afi"]: commands.append( self._compute_command(self._rs_id(None, w["afi"]), remove=True), ) elif have: for h in have: r_sets = self._get_r_sets(h) if r_sets: commands.append(self._compute_command(self._rs_id(None, h["afi"]), remove=True)) return commands def _add_r_sets(self, afi, want, have, opr=True): """ This function forms the set/delete commands based on the 'opr' type for rule-sets attributes. :param afi: address type. :param want: desired config. :param have: target config. :param opr: True/False. :return: generated commands list. """ commands = [] l_set = ("description", "default_action", "default_jump_target", "enable_default_log") h_rs = {} h_rules = {} w_rs = deepcopy(remove_empties(want)) w_rules = w_rs.pop("rules", None) rs_id = self._rs_id(want, afi=afi) if have: h_rs = deepcopy(remove_empties(have)) h_rules = h_rs.pop("rules", None) if w_rs: for key, val in iteritems(w_rs): if opr and key in l_set and not (h_rs and self._is_w_same(w_rs, h_rs, key)): if key == "enable_default_log": if val and (not h_rs or key not in h_rs or not h_rs[key]): commands.append(self._add_rs_base_attrib(rs_id, key, w_rs)) else: commands.append(self._add_rs_base_attrib(rs_id, key, w_rs)) elif not opr and key in l_set: if ( key == "enable_default_log" and val and h_rs and (key not in h_rs or not h_rs[key]) ): commands.append(self._add_rs_base_attrib(rs_id, key, w_rs, opr)) elif not (h_rs and self._in_target(h_rs, key)): commands.append(self._add_rs_base_attrib(rs_id, key, w_rs, opr)) commands.extend(self._add_rules(rs_id, w_rules, h_rules, opr)) if h_rules: have["rules"] = h_rules if w_rules: want["rules"] = w_rules return commands def _add_rules(self, rs_id, w_rules, h_rules, opr=True): """ This function forms the set/delete commands based on the 'opr' type for rules attributes. :param rs_id: rule-set identifier. :param w_rules: desired config. :param h_rules: target config. :param opr: True/False. :return: generated commands list. """ commands = [] l_set = ( "ipsec", "action", "number", "protocol", "fragment", "disable", "description", "jump_target", ) if w_rules: for w in w_rules: cmd = self._compute_command(rs_id, w["number"], opr=opr) h = self.search_rules_in_have_rs(h_rules, w["number"]) if w != h and self.state == "replaced": h = {} for key, val in iteritems(w): if val: if opr and key in l_set and not (h and self._is_w_same(w, h, key)): if key == "disable": if not (not val and (not h or key not in h or not h[key])): commands.append(self._add_r_base_attrib(rs_id, key, w)) else: commands.append(self._add_r_base_attrib(rs_id, key, w)) elif not opr: # Note: if you are experiencing sticky configuration on replace # you may need to add an explicit check for the key here. Anything that # doesn't have a custom operation is taken care of by the `l_set` check # below, but I'm not sure how any of the others work. # It's possible that historically the delete was forced (but now it's # checked). if key == "number" and self._is_del(l_set, h): commands.append(self._add_r_base_attrib(rs_id, key, w, opr=opr)) continue if ( key == "tcp" and val and h and (key not in h or not h[key] or h[key] != w[key]) ): commands.extend(self._add_tcp(key, w, h, cmd, opr)) if ( key == "state" and val and h and (key not in h or not h[key] or h[key] != w[key]) ): commands.extend(self._add_state(key, w, h, cmd, opr)) if ( key == "icmp" and val and h and (key not in h or not h[key] or h[key] != w[key]) ): commands.extend(self._add_icmp(key, w, h, cmd, opr)) if ( key in ("packet_length", "packet_length_exclude") and val and h and (key not in h or not h[key] or h[key] != w[key]) ): commands.extend(self._add_packet_length(key, w, h, cmd, opr)) elif key == "disable" and val and h and (key not in h or not h[key]): commands.append(self._add_r_base_attrib(rs_id, key, w, opr=opr)) if ( key in ("inbound_interface", "outbound_interface") and val and h and (key not in h or not h[key] or h[key] != w[key]) ): commands.extend(self._add_interface(key, w, h, cmd, opr)) elif ( key in l_set and not (h and self._in_target(h, key)) and not self._is_del(l_set, h) ): commands.append(self._add_r_base_attrib(rs_id, key, w, opr=opr)) elif key == "tcp": commands.extend(self._add_tcp(key, w, h, cmd, opr)) elif key == "time": commands.extend(self._add_time(key, w, h, cmd, opr)) elif key == "icmp": commands.extend(self._add_icmp(key, w, h, cmd, opr)) elif key == "state": commands.extend(self._add_state(key, w, h, cmd, opr)) elif key == "log": commands.extend(self._add_log(key, w, h, cmd, opr)) elif key == "limit": commands.extend(self._add_limit(key, w, h, cmd, opr)) elif key == "recent": commands.extend(self._add_recent(key, w, h, cmd, opr)) elif key == "destination" or key == "source": commands.extend(self._add_src_or_dest(key, w, h, cmd, opr)) elif key in ("packet_length", "packet_length_exclude"): commands.extend(self._add_packet_length(key, w, h, cmd, opr)) elif key in ("inbound_interface", "outbound_interface"): commands.extend(self._add_interface(key, w, h, cmd, opr)) return commands def _add_state(self, attr, w, h, cmd, opr): """ This function forms the command for 'state' attributes based on the 'opr'. :param attr: attribute name. :param w: base config. :param h: target config. :param cmd: commands to be prepend. :return: generated list of commands. """ h_state = {} commands = [] l_set = ("new", "invalid", "related", "established") if w[attr]: if h and attr in h.keys(): h_state = h.get(attr) or {} for item, val in iteritems(w[attr]): if ( opr and item in l_set and not (h_state and self._is_w_same(w[attr], h_state, item)) ): if LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"): commands.append(cmd + (" " + attr + " " + item)) else: commands.append( cmd + (" " + attr + " " + item + " " + self._bool_to_str(val)), ) elif not opr and item in l_set and not self._in_target(h_state, item): commands.append(cmd + (" " + attr + " " + item)) return commands def _add_log(self, attr, w, h, cmd, opr): """ This function forms the command for 'log' attributes based on the 'opr'. :param attr: attribute name. :param w: base config. :param h: target config. :param cmd: commands to be prepend. :return: generated list of commands. """ h_state = {} commands = [] if w[attr]: if h and attr in h.keys(): h_state = h.get(attr) or {} if ( LooseVersion(get_os_version(self._module)) < LooseVersion("1.4") and opr and not (h and self._is_w_same(w, h, attr)) ): commands.append(cmd + " " + attr + " '" + w[attr] + "'") elif ( LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4") and opr and not (h and self._is_w_same(w, h, attr)) ): commands.append(cmd + " " + attr) elif not opr and not self._in_target(h_state, w[attr]): commands.append(cmd + (" " + attr + " '" + w[attr] + "'")) return commands def _add_recent(self, attr, w, h, cmd, opr): """ This function forms the command for 'recent' attributes based on the 'opr'. :param attr: attribute name. :param w: base config. :param h: target config. :param cmd: commands to be prepend. :return: generated list of commands. """ commands = [] h_recent = {} l_set = ("count", "time") if w[attr]: if h and attr in h.keys(): h_recent = h.get(attr) or {} for item, val in iteritems(w[attr]): if ( opr and item in l_set and not (h_recent and self._is_w_same(w[attr], h_recent, item)) ): commands.append(cmd + (" " + attr + " " + item + " " + str(val))) elif ( not opr and item in l_set and not (h_recent and self._in_target(h_recent, item)) ): commands.append(cmd + (" " + attr + " " + item)) return commands def _add_icmp(self, attr, w, h, cmd, opr): """ This function forms the commands for 'icmp' attributes based on the 'opr'. :param attr: attribute name. :param w: base config. :param h: target config. :param cmd: commands to be prepend. :return: generated list of commands. """ commands = [] h_icmp = {} l_set = ("code", "type", "type_name") if w[attr]: if h and attr in h.keys(): h_icmp = h.get(attr) or {} for item, val in iteritems(w[attr]): if ( opr and item in l_set and not (h_icmp and self._is_w_same(w[attr], h_icmp, item)) ): if item == "type_name": if LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"): param_name = "type-name" else: param_name = "type" if "ipv6" in cmd: # ipv6-name or ipv6 commands.append(cmd + (" " + "icmpv6" + " " + param_name + " " + val)) else: commands.append( cmd + (" " + attr + " " + item.replace("_", "-") + " " + val), ) else: if "ipv6" in cmd: # ipv6-name or ipv6 commands.append(cmd + (" " + "icmpv6" + " " + item + " " + str(val))) else: commands.append(cmd + (" " + attr + " " + item + " " + str(val))) elif not opr and item in l_set and not self._in_target(h_icmp, item): commands.append( cmd + (" " + attr + " " + item.replace("_", "-") + " " + str(val)), ) return commands def _add_interface(self, attr, w, h, cmd, opr): """ This function forms the commands for 'interface' attributes based on the 'opr'. :param attr: attribute name. :param w: base config. :param h: target config. :param cmd: commands to be prepend. :return: generated list of commands. """ commands = [] h_if = {} l_set = ("name", "group") if w[attr]: if h and attr in h.keys(): h_if = h.get(attr) or {} for item, val in iteritems(w[attr]): if opr and item in l_set and not (h_if and self._is_w_same(w[attr], h_if, item)): commands.append( cmd + (" " + attr.replace("_", "-") + " " + item.replace("_", "-") + " " + val), ) elif not opr and item in l_set and not (h_if and self._in_target(h_if, item)): commands.append( cmd + (" " + attr.replace("_", "-") + " " + item.replace("_", "-")), ) return commands def _add_time(self, attr, w, h, cmd, opr): """ This function forms the commands for 'time' attributes based on the 'opr'. :param attr: attribute name. :param w: base config. :param h: target config. :param cmd: commands to be prepend. :return: generated list of commands. """ commands = [] h_time = {} l_set = ( "utc", "stopdate", "stoptime", "weekdays", "monthdays", "startdate", "starttime", ) if w[attr]: if h and attr in h.keys(): h_time = h.get(attr) or {} for item, val in iteritems(w[attr]): if ( opr and item in l_set and not (h_time and self._is_w_same(w[attr], h_time, item)) ): if item == "utc": if not (not val and (not h_time or item not in h_time)): commands.append(cmd + (" " + attr + " " + item)) else: commands.append(cmd + (" " + attr + " " + item + " " + val)) elif ( not opr and item in l_set and not (h_time and self._is_w_same(w[attr], h_time, item)) ): commands.append(cmd + (" " + attr + " " + item)) return commands def _add_tcp_1_4(self, attr, w, h, cmd, opr): """ This function forms the commands for 'tcp' attributes based on the 'opr'. Version 1.4+ :param attr: attribute name. :param w: base config. :param h: target config. :param cmd: commands to be prepend. :return: generated list of commands. """ commands = [] have = [] key = "flags" want = [] if w: if w.get(attr): want = w.get(attr).get(key) or [] if h: if h.get(attr): have = h.get(attr).get(key) or [] if want: if opr: flags = list_diff_want_only(want, have) for flag in flags: invert = flag.get("invert", False) commands.append( cmd + (" " + attr + " flags " + ("not " if invert else "") + flag["flag"]), ) elif not opr: flags = list_diff_want_only(want, have) for flag in flags: invert = flag.get("invert", False) commands.append( cmd + (" " + attr + " flags " + ("not " if invert else "") + flag["flag"]), ) return commands def _add_packet_length(self, attr, w, h, cmd, opr): """ This function forms the commands for 'packet_length[_exclude]' attributes based on the 'opr'. If < 1.4, handle tcp attributes. :param attr: attribute name. :param w: base config. :param h: target config. :param cmd: commands to be prepend. :return: generated list of commands. """ commands = [] have = [] want = [] if w: if w.get(attr): want = w.get(attr) or [] if h: if h.get(attr): have = h.get(attr) or [] attr = attr.replace("_", "-") if want: if opr: lengths = list_diff_want_only(want, have) for l_rec in lengths: commands.append(cmd + " " + attr + " " + str(l_rec["length"])) elif not opr: lengths = list_diff_want_only(want, have) for l_rec in lengths: commands.append(cmd + " " + attr + " " + str(l_rec["length"])) return commands def _tcp_flags_string(self, flags): """ This function forms the tcp flags string. :param flags: flags list. :return: flags string or None. """ if not flags: return "" flag_str = "" for flag in flags: this_flag = flag["flag"].upper() if flag.get("invert", False): this_flag = "!" + this_flag if len(flag_str) > 0: flag_str = ",".join([flag_str, this_flag]) else: flag_str = this_flag return flag_str def _add_tcp(self, attr, w, h, cmd, opr): """ This function forms the commands for 'tcp' attributes based on the 'opr'. If < 1.4, handle tcp attributes. :param attr: attribute name. :param w: base config. :param h: target config. :param cmd: commands to be prepend. :return: generated list of commands. """ if LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"): return self._add_tcp_1_4(attr, w, h, cmd, opr) h_tcp = {} commands = [] if w[attr]: key = "flags" flags = w[attr].get(key) or {} if flags: if h and key in h[attr].keys(): h_tcp = h[attr].get(key) or {} if flags: flag_str = self._tcp_flags_string(flags) if opr and not (h_tcp and flags == h_tcp): commands.append(cmd + (" " + attr + " " + "flags" + " " + flag_str)) if not opr and not (h_tcp and flags == h_tcp): commands.append(cmd + (" " + attr + " " + "flags" + " " + flag_str)) return commands def _add_limit(self, attr, w, h, cmd, opr): """ This function forms the commands for 'limit' attributes based on the 'opr'. :param attr: attribute name. :param w: base config. :param h: target config. :param cmd: commands to be prepend. :return: generated list of commands. """ h_limit = {} commands = [] if w[attr]: key = "burst" if ( opr and key in w[attr].keys() and not (h and attr in h.keys() and self._is_w_same(w[attr], h[attr], key)) ): commands.append(cmd + (" " + attr + " " + key + " " + str(w[attr].get(key)))) elif ( not opr and key in w[attr].keys() and not (h and attr in h.keys() and self._in_target(h[attr], key)) ): commands.append(cmd + (" " + attr + " " + key + " " + str(w[attr].get(key)))) key = "rate" rate = w[attr].get(key) or {} if rate: if h and key in h[attr].keys(): h_limit = h[attr].get(key) or {} if "unit" in rate and "number" in rate: if opr and not ( h_limit and self._is_w_same(rate, h_limit, "unit") and self._is_w_same(rate, h_limit, "number") ): commands.append( cmd + ( " " + attr + " " + key + " " + str(rate["number"]) + "/" + rate["unit"] ), ) if not opr and not ( h_limit and self._is_w_same(rate, h_limit, "unit") and self._is_w_same(rate, h_limit, "number") ): commands.append(cmd + (" " + attr + " " + key)) return commands def _add_src_or_dest(self, attr, w, h, cmd, opr=True): """ This function forms the commands for 'src/dest' attributes based on the 'opr'. :param attr: attribute name. :param w: base config. :param h: target config. :param cmd: commands to be prepend. :return: generated list of commands. """ commands = [] h_group = {} g_set = ("port_group", "address_group", "network_group") if w[attr]: keys = ("address", "mac_address", "port") for key in keys: if ( opr and key in w[attr].keys() and not (h and attr in h.keys() and self._is_w_same(w[attr], h[attr], key)) ): commands.append( cmd + (" " + attr + " " + key.replace("_", "-") + " " + w[attr].get(key)), ) elif ( not opr and key in w[attr].keys() and not (h and attr in h.keys() and self._in_target(h[attr], key)) ): commands.append(cmd + (" " + attr + " " + key)) key = "group" group = w[attr].get(key) or {} if group: h_group = {} if h and h.get(attr) and key in h[attr].keys(): h_group = h[attr].get(key) for item, val in iteritems(group): if val: if ( opr and item in g_set and not (h_group and self._is_w_same(group, h_group, item)) ): commands.append( cmd + ( " " + attr + " " + key + " " + item.replace("_", "-") + " " + val ), ) elif ( not opr and item in g_set and not (h_group and self._in_target(h_group, item)) ): commands.append( cmd + (" " + attr + " " + key + " " + item.replace("_", "-")), ) return commands def search_rules_in_have_rs(self, have_rules, r_number): """ This function returns the rule if it is present in target config. :param have: target config. :param rs_id: rule-set identifier. :param r_number: rule-number. :return: rule. """ if have_rules: key = "number" for r in have_rules: if key in r and r[key] == r_number: return r return None def search_r_sets_in_have(self, have, rs_id, type="rule_sets"): """ This function returns the rule-set/rule if it is present in target config. :param have: target config. :param rs_id: rule-identifier. :param type: rule_sets if searching a rule_set and r_list if searching from a rule_list. :return: rule-set/rule. """ if "afi" in rs_id: afi = rs_id["afi"] else: afi = None if rs_id["filter"]: key = "filter" w_value = rs_id["filter"] elif rs_id["name"]: key = "name" w_value = rs_id["name"] else: raise ValueError("id must be specific to name or filter") if type not in ("r_list", "rule_sets"): raise ValueError("type must be rule_sets or r_list") if have: if type == "r_list": for h in have: if h["afi"] == afi: r_sets = self._get_r_sets(h) for rs in r_sets: if key in rs and rs[key] == w_value: return rs else: # searching a ruleset for rs in have: if key in rs and rs[key] == w_value: return rs return None def _get_r_sets(self, item): """ This function returns the list of rule-sets. :param item: config dictionary. :return: list of rule-sets/rules. """ rs_list = [] type = "rule_sets" r_sets = item[type] if r_sets: for rs in r_sets: rs_list.append(rs) return rs_list def _compute_command( self, rs_id, number=None, attrib=None, value=None, remove=False, opr=True, ): """ This function construct the add/delete command based on passed attributes. :param rs_id: rule-set identifier. :param number: rule-number. :param attrib: attribute name. :param value: value. :param remove: True if delete command needed to be construct. :param opr: operation flag. :return: generated command. """ if rs_id["name"] and rs_id["filter"]: raise ValueError("name and filter cannot be used together") if remove or not opr: cmd = "delete firewall " + self._get_fw_type(rs_id["afi"]) else: cmd = "set firewall " + self._get_fw_type(rs_id["afi"]) if LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"): if rs_id["name"]: cmd += " name " + rs_id["name"] elif rs_id["filter"]: cmd += " " + rs_id["filter"] + " filter" elif rs_id["name"]: cmd += " " + rs_id["name"] if number: cmd += " rule " + str(number) if attrib: if ( LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4") and attrib == "enable_default_log" ): cmd += " " + "default-log" else: cmd += " " + attrib.replace("_", "-") if value and opr and attrib != "enable_default_log" and attrib != "disable": cmd += " '" + str(value) + "'" return cmd def _add_r_base_attrib(self, rs_id, attr, rule, opr=True): """ This function forms the command for 'rules' attributes which doesn't have further sub attributes. :param rs_id: rule-set identifier. :param attrib: attribute name :param rule: rule config dictionary. :param opr: True/False. :return: generated command. """ if attr == "number": command = self._compute_command(rs_id, number=rule["number"], opr=opr) else: command = self._compute_command( rs_id=rs_id, number=rule["number"], attrib=attr, value=rule[attr], opr=opr, ) return command def _rs_id(self, have, afi, name=None, filter=None): """ This function returns the rule-set identifier based on the example rule, overriding the components as specified. :param have: example rule. :param afi: address type. :param name: rule-set name. :param filter: filter name. :return: rule-set identifier. """ identifier = {"name": None, "filter": None} if afi: identifier["afi"] = afi else: raise ValueError("afi must be provided") if name: identifier["name"] = name return identifier elif filter: identifier["filter"] = filter return identifier if have: if "name" in have and have["name"]: identifier["name"] = have["name"] return identifier if "filter" in have and have["filter"]: identifier["filter"] = have["filter"] return identifier # raise ValueError("name or filter must be provided or present in have") # unless we want a wildcard return identifier def _add_rs_base_attrib(self, rs_id, attrib, rule, opr=True): """ This function forms the command for 'rule-sets' attributes which don't have further sub attributes. :param rs_id: rule-set identifier. :param attrib: attribute name :param rule: rule config dictionary. :param opr: True/False. :return: generated command. """ command = self._compute_command( rs_id=rs_id, attrib=attrib, value=rule[attrib], opr=opr, ) return command def _bool_to_str(self, val): """ This function converts the bool value into string. :param val: bool value. :return: enable/disable. """ return "enable" if val else "disable" def _get_fw_type(self, afi): """ This function returns the firewall rule-set type based on IP address. :param afi: address type :return: rule-set type. """ if LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"): return "ipv6" if afi == "ipv6" else "ipv4" return "ipv6-name" if afi == "ipv6" else "name" def _is_del(self, l_set, h, key="number"): """ This function checks whether rule needs to be deleted based on the rule number. :param l_set: attribute set. :param h: target config. :param key: number. :return: True/False. """ return key in l_set and not (h and self._in_target(h, key)) def _is_w_same(self, w, h, key): """ This function checks whether the key value is same in base and target config dictionary. :param w: base config. :param h: target config. :param key:attribute name. :return: True/False. """ return True if h and key in h and h[key] == w[key] else False def _in_target(self, h, key): """ This function checks whether the target exists and key present in target config. :param h: target config. :param key: attribute name. :return: True/False. """ return True if h and key in h else False def _prune_stubs(self, rs): if isinstance(rs, list): for item in rs: self._prune_stubs(item) elif isinstance(rs, dict): keys_to_remove = [ key for key, value in rs.items() if ( (key == "disable" and value is False) or ( key == "log" and value == "disable" and LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4") ) or ( key in ["new", "invalid", "related", "established"] and value is False and LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4") ) ) ] for key in keys_to_remove: del rs[key] for key in rs: self._prune_stubs(rs[key]) def _is_same_rs(self, w, rs): if isinstance(w, dict) and isinstance(rs, dict): if w.keys() != rs.keys(): return False for key in w: if not self._is_same_rs(w[key], rs[key]): return False return True elif isinstance(w, list) and isinstance(rs, list): try: - sorted_list1 = sorted(w, key=lambda x: str(x)) # pylint: disable=unnecessary-lambda - sorted_list2 = sorted( - rs, - key=lambda x: str(x), # pylint: disable=unnecessary-lambda - ) + def comparison(x): + if 'name' in x: + return x['name'] + if 'number' in x: + return x['number'] + return str(x) + + sorted_list1 = sorted(w, key=comparison) + sorted_list2 = sorted(rs, key=comparison) except TypeError: return False + if len(sorted_list1) != len(sorted_list2): + return False return all(self._is_same_rs(x, y) for x, y in zip(sorted_list1, sorted_list2)) else: return w == rs diff --git a/plugins/module_utils/network/vyos/vyos.py b/plugins/module_utils/network/vyos/vyos.py index 1430b1b1..f37294e6 100644 --- a/plugins/module_utils/network/vyos/vyos.py +++ b/plugins/module_utils/network/vyos/vyos.py @@ -1,108 +1,109 @@ # This code is part of Ansible, but is an independent component. # This particular file snippet, and this file snippet only, is BSD licensed. # Modules you write using this snippet, which is embedded dynamically by Ansible # still belong to the author of the module, and may assign their own license # to the complete work. # # (c) 2016 Red Hat Inc. # # Redistribution and use in source and binary forms, with or without modification, # are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE # USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # from __future__ import absolute_import, division, print_function __metaclass__ = type import json from ansible.module_utils._text import to_text from ansible.module_utils.connection import Connection, ConnectionError + _DEVICE_CONFIGS = {} def get_connection(module): if hasattr(module, "_vyos_connection"): return module._vyos_connection capabilities = get_capabilities(module) network_api = capabilities.get("network_api") if network_api == "cliconf": module._vyos_connection = Connection(module._socket_path) else: module.fail_json(msg="Invalid connection type %s" % network_api) return module._vyos_connection def get_capabilities(module): if hasattr(module, "_vyos_capabilities"): return module._vyos_capabilities try: capabilities = Connection(module._socket_path).get_capabilities() except ConnectionError as exc: module.fail_json(msg=to_text(exc, errors="surrogate_then_replace")) module._vyos_capabilities = json.loads(capabilities) return module._vyos_capabilities def get_config(module, flags=None, format=None): flags = [] if flags is None else flags global _DEVICE_CONFIGS - if _DEVICE_CONFIGS != {}: - return _DEVICE_CONFIGS + if _DEVICE_CONFIGS != {} or module.params["match"] == "none": + return to_text(_DEVICE_CONFIGS) else: connection = get_connection(module) try: out = connection.get_config(flags=flags, format=format) except ConnectionError as exc: module.fail_json(msg=to_text(exc, errors="surrogate_then_replace")) cfg = to_text(out, errors="surrogate_then_replace").strip() _DEVICE_CONFIGS = cfg return cfg def run_commands(module, commands, check_rc=True): connection = get_connection(module) try: response = connection.run_commands(commands=commands, check_rc=check_rc) except ConnectionError as exc: module.fail_json(msg=to_text(exc, errors="surrogate_then_replace")) return response def load_config(module, commands, commit=False, comment=None): connection = get_connection(module) try: response = connection.edit_config(candidate=commands, commit=commit, comment=comment) except ConnectionError as exc: module.fail_json(msg=to_text(exc, errors="surrogate_then_replace")) return response.get("diff") def get_os_version(module): connection = get_connection(module) if connection.get_device_info(): os_version = connection.get_device_info()["network_os_major_version"] return os_version diff --git a/plugins/modules/vyos_config.py b/plugins/modules/vyos_config.py index 75c062aa..6cc4f9f5 100644 --- a/plugins/modules/vyos_config.py +++ b/plugins/modules/vyos_config.py @@ -1,393 +1,394 @@ #!/usr/bin/python # # This file is part of Ansible # # Ansible is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Ansible is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Ansible. If not, see . # from __future__ import absolute_import, division, print_function __metaclass__ = type DOCUMENTATION = """ module: vyos_config author: Nathaniel Case (@Qalthos) short_description: Manage VyOS configuration on remote device description: - This module provides configuration file management of VyOS devices. It provides arguments for managing both the configuration file and state of the active configuration. All configuration statements are based on `set` and `delete` commands in the device configuration. version_added: 1.0.0 extends_documentation_fragment: - vyos.vyos.vyos notes: - Tested against VyOS 1.1.8 (helium). - This module works with connection C(ansible.netcommon.network_cli). See L(the VyOS OS Platform Options,../network/user_guide/platform_vyos.html). - To ensure idempotency and correct diff the configuration lines in the relevant module options should be similar to how they appear if present in the running configuration on device including the indentation. options: lines: description: - The ordered set of commands that should be configured in the section. The commands must be the exact same commands as found in the device running-config as found in the device running-config to ensure idempotency and correct diff. Be sure to note the configuration command syntax as some commands are automatically modified by the device config parser. type: list elements: str src: description: - The C(src) argument specifies the path to the source config file to load. The source config file can either be in bracket format or set format. The source file can include Jinja2 template variables. The configuration lines in the source file should be similar to how it will appear if present in the running-configuration of the device including indentation to ensure idempotency and correct diff. type: path match: description: - The C(match) argument controls the method used to match against the current active configuration. By default, the configuration commands config are matched against the active config and the deltas are loaded line by line. If the C(match) argument is set to C(none) the active configuration is ignored and the configuration is always loaded. If the C(match) argument is set to C(smart) the active configuration and the target configuration are compared and differences are added to or removed from the target. Using C(smart), the special value C(...) indicates that this value should not be changed and its siblings should not be removed from the target. type: str default: line choices: - line - smart - none backup: description: - The C(backup) argument will backup the current devices active configuration to the Ansible control host prior to making any changes. If the C(backup_options) value is not given, the backup file will be located in the backup folder in the playbook root directory or role root directory, if playbook is part of an ansible role. If the directory does not exist, it is created. type: bool default: no comment: description: - Allows a commit description to be specified to be included when the configuration is committed. If the configuration is not changed or committed, this argument is ignored. default: configured by vyos_config type: str config: description: - The C(config) argument specifies the base configuration to use to compare against the desired configuration. If this value is not specified, the module will automatically retrieve the current active configuration from the remote device. The configuration lines in the option value should be similar to how it will appear if present in the running-configuration of the device including indentation to ensure idempotency and correct diff. type: str save: description: - The C(save) argument controls whether or not changes made to the active configuration are saved to disk. This is independent of committing the config. When set to True, the active configuration is saved. type: bool default: no backup_options: description: - This is a dict object containing configurable options related to backup file path. The value of this option is read only when C(backup) is set to I(yes), if C(backup) is set to I(no) this option will be silently ignored. suboptions: filename: description: - The filename to be used to store the backup configuration. If the filename is not given it will be generated based on the hostname, current time and date in format defined by _config.@ type: str dir_path: description: - This option provides the path ending with directory name in which the backup configuration file will be stored. If the directory does not exist it will be first created and the filename is either the value of C(filename) or default filename as described in C(filename) options description. If the path value is not given in that case a I(backup) directory will be created in the current working directory and backup configuration will be copied in C(filename) within I(backup) directory. type: path type: dict """ EXAMPLES = """ - name: configure the remote device vyos.vyos.vyos_config: lines: - set system host-name {{ inventory_hostname }} - set service lldp - delete service dhcp-server - name: backup and load from file vyos.vyos.vyos_config: src: vyos.cfg backup: true - name: render a Jinja2 template onto the VyOS router vyos.vyos.vyos_config: match: smart src: vyos_template.j2 - name: for idempotency, use full-form commands vyos.vyos.vyos_config: lines: # - set int eth eth2 description 'OUTSIDE' - set interface ethernet eth2 description 'OUTSIDE' - name: configurable backup path vyos.vyos.vyos_config: backup: true backup_options: filename: backup.cfg dir_path: /home/user """ RETURN = """ commands: description: The list of configuration commands sent to the device returned: always type: list sample: ['...', '...'] filtered: description: The list of configuration commands removed to avoid a load failure returned: always type: list sample: ['...', '...'] backup_path: description: The full path to the backup file returned: when backup is yes type: str sample: /playbooks/ansible/backup/vyos_config.2016-07-16@22:28:34 filename: description: The name of the backup file returned: when backup is yes and filename is not specified in backup options type: str sample: vyos_config.2016-07-16@22:28:34 shortname: description: The full path to the backup file excluding the timestamp returned: when backup is yes and filename is not specified in backup options type: str sample: /playbooks/ansible/backup/vyos_config date: description: The date extracted from the backup file name returned: when backup is yes type: str sample: "2016-07-16" time: description: The time extracted from the backup file name returned: when backup is yes type: str sample: "22:28:34" """ import re from ansible.module_utils._text import to_text from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.connection import ConnectionError from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.vyos import ( get_config, get_connection, load_config, run_commands, ) DEFAULT_COMMENT = "configured by vyos_config" CONFIG_FILTERS = [ re.compile(r"set system login user \S+ authentication encrypted-password"), ] def get_candidate(module): contents = module.params["src"] or module.params["lines"] if module.params["src"]: contents = contents.splitlines() if len(contents) > 0: line = contents[0].split() if len(line) > 0 and line[0] in ("set", "delete"): contents = format_commands(contents) contents = "\n".join(contents) return contents def format_commands(commands): """ This function format the input commands and removes the prepend white spaces for command lines having 'set' or 'delete' and it skips empty lines. :param commands: :return: list of commands """ return [ line.strip() if line.split()[0] in ("set", "delete") else line for line in commands if len(line.strip()) > 0 ] def diff_config(commands, config): config = [str(c).replace("'", "") for c in config.splitlines()] updates = list() visited = set() for line in commands: item = str(line).replace("'", "") if not item.startswith("set") and not item.startswith("delete"): raise ValueError("line must start with either `set` or `delete`") elif item.startswith("set") and item not in config: updates.append(line) elif item.startswith("delete"): if not config: updates.append(line) else: item = re.sub(r"delete", "set", item) for entry in config: if entry.startswith(item) and line not in visited: updates.append(line) visited.add(line) return list(updates) def sanitize_config(config, result): result["filtered"] = list() index_to_filter = list() for regex in CONFIG_FILTERS: for index, line in enumerate(list(config)): if regex.search(line): result["filtered"].append(line) index_to_filter.append(index) # Delete all filtered configs for filter_index in sorted(index_to_filter, reverse=True): del config[filter_index] def run(module, result): # get the current active config from the node or passed in via # the config param + config = module.params["config"] or get_config(module) # create the candidate config object from the arguments candidate = get_candidate(module) # create loadable config that includes only the configuration updates connection = get_connection(module) try: response = connection.get_diff( candidate=candidate, running=config, diff_match=module.params["match"], ) except ConnectionError as exc: module.fail_json(msg=to_text(exc, errors="surrogate_then_replace")) commands = response.get("config_diff") sanitize_config(commands, result) result["commands"] = commands commit = not module.check_mode comment = module.params["comment"] diff = None if commands: diff = load_config(module, commands, commit=commit, comment=comment) if result.get("filtered"): result["warnings"].append( "Some configuration commands were removed, please see the filtered key", ) result["changed"] = True if module._diff: result["diff"] = {"prepared": diff} def main(): backup_spec = dict(filename=dict(), dir_path=dict(type="path")) argument_spec = dict( src=dict(type="path"), lines=dict(type="list", elements="str"), match=dict(default="line", choices=["line", "smart", "none"]), comment=dict(default=DEFAULT_COMMENT), config=dict(), backup=dict(type="bool", default=False), backup_options=dict(type="dict", options=backup_spec), save=dict(type="bool", default=False), ) mutually_exclusive = [("lines", "src")] module = AnsibleModule( argument_spec=argument_spec, mutually_exclusive=mutually_exclusive, supports_check_mode=True, ) warnings = list() result = dict(changed=False, warnings=warnings) if module.params["backup"]: result["__backup__"] = get_config(module=module) if any((module.params["src"], module.params["lines"])): run(module, result) if module.params["save"]: diff = run_commands(module, commands=["configure", "compare saved"])[1] if diff not in { "[edit]", - "No changes between working and saved configurations.\n\n[edit]" + "No changes between working and saved configurations.\n\n[edit]", }: if not module.check_mode: run_commands(module, commands=["save"]) result["changed"] = True run_commands(module, commands=["exit"]) if result.get("changed") and any((module.params["src"], module.params["lines"])): msg = ( "To ensure idempotency and correct diff the input configuration lines should be" " similar to how they appear if present in" " the running configuration on device" ) if module.params["src"]: msg += " including the indentation" if "warnings" in result: result["warnings"].append(msg) else: result["warnings"] = msg module.exit_json(**result) if __name__ == "__main__": main() diff --git a/tests/integration/targets/vyos_config/tests/cli/save.yaml b/tests/integration/targets/vyos_config/tests/cli/save.yaml index e8a9035b..b39ef957 100644 --- a/tests/integration/targets/vyos_config/tests/cli/save.yaml +++ b/tests/integration/targets/vyos_config/tests/cli/save.yaml @@ -1,54 +1,54 @@ --- - debug: msg="START cli/save.yaml on connection={{ ansible_connection }}" - name: setup vyos.vyos.vyos_config: lines: set system host-name {{ inventory_hostname_short }} match: none -- name: configure hostaname and save +- name: configure hostname and save register: result vyos.vyos.vyos_config: lines: set system host-name foo save: true - assert: that: - result.changed == true - "'set system host-name foo' in result.commands" -- name: configure hostaname and don't save +- name: configure hostname and don't save register: result vyos.vyos.vyos_config: lines: set system host-name bar - assert: that: - result.changed == true - "'set system host-name bar' in result.commands" - name: save config register: result vyos.vyos.vyos_config: save: true - assert: that: - result.changed == true - name: save config again register: result vyos.vyos.vyos_config: save: true - assert: that: - result.changed == false - name: teardown vyos.vyos.vyos_config: lines: set system host-name {{ inventory_hostname_short }} match: none save: true - debug: msg="END cli/simple.yaml on connection={{ ansible_connection }}" diff --git a/tests/integration/targets/vyos_config/tests/cli/simple.yaml b/tests/integration/targets/vyos_config/tests/cli/simple.yaml index 3db59270..1559fa2b 100644 --- a/tests/integration/targets/vyos_config/tests/cli/simple.yaml +++ b/tests/integration/targets/vyos_config/tests/cli/simple.yaml @@ -1,53 +1,64 @@ --- - debug: msg="START cli/simple.yaml on connection={{ ansible_connection }}" - name: setup vyos.vyos.vyos_config: lines: set system host-name {{ inventory_hostname_short }} match: none - name: configure simple config command register: result vyos.vyos.vyos_config: lines: set system host-name foo - assert: that: - result.changed == true - "'set system host-name foo' in result.commands" - name: check simple config command idempontent register: result vyos.vyos.vyos_config: lines: set system host-name foo - assert: that: - result.changed == false +- name: configure simple config command while match = 'none' + register: result + vyos.vyos.vyos_config: + lines: set system host-name foo + match: none + +- assert: + that: + - result.changed == true + - "'set system host-name foo' in result.commands" + - name: Delete services vyos.vyos.vyos_config: &id001 lines: - delete service lldp - delete protocols static - name: Configuring when commands starts with whitespaces register: result vyos.vyos.vyos_config: src: "{{ role_path }}/tests/cli/config.cfg" - assert: that: - result.changed == true - '"set service lldp" in result.commands' - '"set protocols static" in result.commands' - name: Delete services vyos.vyos.vyos_config: *id001 - name: teardown vyos.vyos.vyos_config: lines: set system host-name {{ inventory_hostname_short }} match: none - debug: msg="END cli/simple.yaml on connection={{ ansible_connection }}" diff --git a/tests/integration/targets/vyos_interfaces/tests/cli/_populate.yaml b/tests/integration/targets/vyos_interfaces/tests/cli/_populate.yaml index 45bd9b6a..cbc994ba 100644 --- a/tests/integration/targets/vyos_interfaces/tests/cli/_populate.yaml +++ b/tests/integration/targets/vyos_interfaces/tests/cli/_populate.yaml @@ -1,15 +1,17 @@ --- - name: ensure facts include_tasks: _get_version.yaml - ansible.builtin.include_tasks: _remove_config.yaml - name: Setup ansible.netcommon.cli_config: config: |- {% for intf in ('eth1','eth2') %} set interfaces ethernet "{{ intf }}" description 'Configured by Ansible' + set interfaces ethernet "{{ intf }}" speed 'auto' + set interfaces ethernet "{{ intf }}" duplex 'auto' set interfaces ethernet "{{ intf }}" mtu '1500' set interfaces ethernet "{{ intf }}" vif 200 set interfaces ethernet "{{ intf }}" vif 200 description 'VIF - 200' {% endfor %} diff --git a/tests/integration/targets/vyos_interfaces/tests/cli/deleted.yaml b/tests/integration/targets/vyos_interfaces/tests/cli/deleted.yaml index 620bf53f..9f0734b4 100644 --- a/tests/integration/targets/vyos_interfaces/tests/cli/deleted.yaml +++ b/tests/integration/targets/vyos_interfaces/tests/cli/deleted.yaml @@ -1,46 +1,46 @@ --- - debug: msg: Start vyos_interfaces deleted integration tests ansible_connection={{ ansible_connection }} - include_tasks: _populate.yaml - block: - name: Delete attributes of given interfaces register: result vyos.vyos.vyos_interfaces: &id001 config: - name: eth1 - name: eth2 state: deleted - name: Assert that the before dicts were correctly generated assert: that: - - "{{ populate | symmetric_difference(result['before']) |length == 0 }}" + - populate | symmetric_difference(result['before']) |length == 0 - name: Assert that the correct set of commands were generated assert: that: - - "{{ deleted['commands'] | symmetric_difference(result['commands']) |length == 0 }}" + - deleted['commands'] | symmetric_difference(result['commands']) |length == 0 - name: Assert that the after dicts were correctly generated assert: that: - - "{{ deleted['after'] | symmetric_difference(result['after']) |length == 0 }}" + - deleted['after'] | symmetric_difference(result['after']) |length == 0 - name: Delete attributes of given interfaces (IDEMPOTENT) register: result vyos.vyos.vyos_interfaces: *id001 - name: Assert that the previous task was idempotent assert: that: - result.changed == false - name: Assert that the before dicts were correctly generated assert: that: - - "{{ deleted['after'] | symmetric_difference(result['before']) |length == 0 }}" + - deleted['after'] | symmetric_difference(result['before']) |length == 0 always: - include_tasks: _remove_config.yaml diff --git a/tests/integration/targets/vyos_interfaces/tests/cli/gathered.yaml b/tests/integration/targets/vyos_interfaces/tests/cli/gathered.yaml index 88e53762..46a0e166 100644 --- a/tests/integration/targets/vyos_interfaces/tests/cli/gathered.yaml +++ b/tests/integration/targets/vyos_interfaces/tests/cli/gathered.yaml @@ -1,20 +1,20 @@ --- - debug: msg: START vyos_interfaces gathered integration tests on connection={{ ansible_connection }} - include_tasks: _populate.yaml - block: - name: Gather the provided configuration with the existing running configuration register: result vyos.vyos.vyos_interfaces: config: state: gathered - name: Assert that gathered dicts was correctly generated assert: that: - - "{{ populate | symmetric_difference(result['gathered']) |length == 0 }}" + - populate | symmetric_difference(result['gathered']) |length == 0 always: - include_tasks: _remove_config.yaml diff --git a/tests/integration/targets/vyos_interfaces/tests/cli/merged.yaml b/tests/integration/targets/vyos_interfaces/tests/cli/merged.yaml index 5c719b39..decdeca2 100644 --- a/tests/integration/targets/vyos_interfaces/tests/cli/merged.yaml +++ b/tests/integration/targets/vyos_interfaces/tests/cli/merged.yaml @@ -1,57 +1,59 @@ --- - debug: msg: START vyos_interfaces merged integration tests on connection={{ ansible_connection }} - include_tasks: _remove_config.yaml - block: - name: Merge the provided configuration with the existing running configuration register: result vyos.vyos.vyos_interfaces: &id001 config: - name: eth1 description: Configured by Ansible - Interface 1 mtu: 1500 + speed: auto + duplex: auto vifs: - vlan_id: 100 description: Eth1 - VIF 100 mtu: 1404 - vlan_id: 101 description: Eth1 - VIF 101 - name: eth2 description: Configured by Ansible - Interface 2 (ADMIN DOWN) mtu: 1406 enabled: false state: merged - name: Assert that before dicts were correctly generated assert: - that: "{{ merged['before'] | symmetric_difference(result['before']) |length == 0 }}" + that: merged['before'] | symmetric_difference(result['before']) |length == 0 - name: Assert that correct set of commands were generated assert: that: - - "{{ merged['commands'] | symmetric_difference(result['commands']) |length == 0 }}" + - merged['commands'] | symmetric_difference(result['commands']) |length == 0 - name: Assert that after dicts was correctly generated assert: that: - - "{{ merged['after'] | symmetric_difference(result['after']) |length == 0 }}" + - merged['after'] | symmetric_difference(result['after']) |length == 0 - name: Merge the provided configuration with the existing running configuration (IDEMPOTENT) register: result vyos.vyos.vyos_interfaces: *id001 - name: Assert that the previous task was idempotent assert: that: - result['changed'] == false - name: Assert that before dicts were correctly generated assert: that: - - "{{ merged['after'] | symmetric_difference(result['before']) |length == 0 }}" + - merged['after'] | symmetric_difference(result['before']) |length == 0 always: - include_tasks: _remove_config.yaml diff --git a/tests/integration/targets/vyos_interfaces/tests/cli/overridden.yaml b/tests/integration/targets/vyos_interfaces/tests/cli/overridden.yaml index 7e86d3e4..5d87ab68 100644 --- a/tests/integration/targets/vyos_interfaces/tests/cli/overridden.yaml +++ b/tests/integration/targets/vyos_interfaces/tests/cli/overridden.yaml @@ -1,50 +1,50 @@ --- - debug: msg: START vyos_interfaces overridden integration tests on connection={{ ansible_connection }} - include_tasks: _populate.yaml - block: - name: Overrides all device configuration with provided configuration register: result vyos.vyos.vyos_interfaces: &id001 config: - name: eth0 speed: auto duplex: auto - name: eth2 description: Overridden by Ansible mtu: 1402 state: overridden - name: Assert that before dicts were correctly generated assert: that: - - "{{ populate | symmetric_difference(result['before']) |length == 0 }}" + - populate | symmetric_difference(result['before']) |length == 0 - name: Assert that correct commands were generated assert: that: - - "{{ overridden['commands'] | symmetric_difference(result['commands']) |length == 0 }}" + - overridden['commands'] | symmetric_difference(result['commands']) |length == 0 - name: Assert that after dicts were correctly generated assert: that: - - "{{ overridden['after'] | symmetric_difference(result['after']) |length == 0 }}" + - overridden['after'] | symmetric_difference(result['after']) |length == 0 - name: Overrides all device configuration with provided configurations (IDEMPOTENT) register: result vyos.vyos.vyos_interfaces: *id001 - name: Assert that the previous task was idempotent assert: that: - result['changed'] == false - name: Assert that before dicts were correctly generated assert: that: - - "{{ overridden['after'] | symmetric_difference(result['before']) |length == 0 }}" + - overridden['after'] | symmetric_difference(result['before']) |length == 0 always: - include_tasks: _remove_config.yaml diff --git a/tests/integration/targets/vyos_interfaces/tests/cli/parsed.yaml b/tests/integration/targets/vyos_interfaces/tests/cli/parsed.yaml index 0ebfd322..329d6b50 100644 --- a/tests/integration/targets/vyos_interfaces/tests/cli/parsed.yaml +++ b/tests/integration/targets/vyos_interfaces/tests/cli/parsed.yaml @@ -1,14 +1,14 @@ --- - debug: msg: START vyos_interfaces parsed integration tests on connection={{ ansible_connection }} - name: Parse externally provided interfaces config to agnostic model register: result vyos.vyos.vyos_interfaces: running_config: "{{ lookup('file', '_parsed_config.cfg') }}" state: parsed - name: Assert that config was correctly parsed assert: that: - - "{{ parsed['after'] | symmetric_difference(result['parsed']) |length == 0 }}" + - parsed['after'] | symmetric_difference(result['parsed']) |length == 0 diff --git a/tests/integration/targets/vyos_interfaces/tests/cli/rendered.yaml b/tests/integration/targets/vyos_interfaces/tests/cli/rendered.yaml index c03347fa..7b66c50e 100644 --- a/tests/integration/targets/vyos_interfaces/tests/cli/rendered.yaml +++ b/tests/integration/targets/vyos_interfaces/tests/cli/rendered.yaml @@ -1,37 +1,41 @@ --- - debug: msg: START vyos_interfaces rendered integration tests on connection={{ ansible_connection }} - include_tasks: _populate.yaml - block: - name: Structure provided configuration into device specific commands register: result vyos.vyos.vyos_interfaces: config: - name: eth0 enabled: true + duplex: auto + speed: auto - name: eth1 description: Configured by Ansible - Interface 1 mtu: 1500 + duplex: auto + speed: auto enabled: true vifs: - vlan_id: 100 description: Eth1 - VIF 100 mtu: 1404 enabled: true - vlan_id: 101 description: Eth1 - VIF 101 enabled: true - name: eth2 description: Configured by Ansible - Interface 2 (ADMIN DOWN) mtu: 1406 enabled: false state: rendered - name: Assert that correct set of commands were generated assert: that: - "{{ rendered['commands'] | symmetric_difference(result['rendered']) |length == 0 }}" always: - include_tasks: _remove_config.yaml diff --git a/tests/integration/targets/vyos_interfaces/tests/cli/replaced.yaml b/tests/integration/targets/vyos_interfaces/tests/cli/replaced.yaml index 9d0a3a8b..5cfa4523 100644 --- a/tests/integration/targets/vyos_interfaces/tests/cli/replaced.yaml +++ b/tests/integration/targets/vyos_interfaces/tests/cli/replaced.yaml @@ -1,66 +1,60 @@ --- - debug: msg: START vyos_interfaces replaced integration tests on connection={{ ansible_connection }} - include_tasks: _populate.yaml - block: - name: Replace device configurations of listed interfaces with provided configurations register: result vyos.vyos.vyos_interfaces: &id001 config: - name: eth1 description: Replaced by Ansible vifs: - vlan_id: 100 description: VIF 100 - Replaced by Ansible - name: eth2 mtu: 1400 description: Replaced by Ansible state: replaced - name: Assert that correct set of commands were generated assert: that: - replaced['commands'] | symmetric_difference(result['commands']) |length == 0 - - debug: - var: populate | symmetric_difference(result['before']) - - name: Assert that before dicts are correctly generated assert: that: - populate | symmetric_difference(result['before']) |length == 0 - - debug: - var: replaced['after'] | symmetric_difference(result['after']) - - name: Assert that after dict is correctly generated assert: that: - replaced['after'] | symmetric_difference(result['after']) |length == 0 - vyos.vyos.vyos_facts: gather_network_resources: interfaces - name: Assert that the facts and the after dict are the same assert: that: - result.after == ansible_facts['network_resources']['interfaces'] - name: Replace device configurations of listed interfaces with provided configurarions (IDEMPOTENT) register: result vyos.vyos.vyos_interfaces: *id001 - name: Assert that task was idempotent assert: that: - result['changed'] == false - name: Assert that before dict is correctly generated assert: that: - replaced['after'] | symmetric_difference(result['before']) |length == 0 always: - include_tasks: _remove_config.yaml diff --git a/tests/integration/targets/vyos_interfaces/tests/cli/rtt.yaml b/tests/integration/targets/vyos_interfaces/tests/cli/rtt.yaml index cfe1b0f0..e6753cba 100644 --- a/tests/integration/targets/vyos_interfaces/tests/cli/rtt.yaml +++ b/tests/integration/targets/vyos_interfaces/tests/cli/rtt.yaml @@ -1,77 +1,79 @@ --- - debug: msg: START vyos_interfaces round trip integration tests on connection={{ ansible_connection }} - include_tasks: _remove_config.yaml - block: - name: Apply the provided configuration (base config) register: base_config vyos.vyos.vyos_interfaces: config: - name: eth0 enabled: true + duplex: auto + speed: auto - name: eth1 description: Interface - 1 mtu: 1500 vifs: - vlan_id: 100 description: Eth1 - VIF 100 mtu: 1402 - vlan_id: 101 enabled: false mtu: 1401 - name: eth2 description: Interface - 2 enabled: true mtu: 1409 state: merged - name: Gather interfaces facts vyos.vyos.vyos_facts: gather_subset: - default gather_network_resources: - interfaces - name: Apply the provided configuration (config to be reverted) register: result vyos.vyos.vyos_interfaces: config: - name: eth1 description: Interface 1 - Description (WILL BE REVERTED) mtu: 1412 vifs: - vlan_id: 100 description: Eth1 - VIF 100 (WILL BE REVERTED) mtu: 1404 - vlan_id: 101 description: Eth1 - VIF 101 (WILL BE REMOVED) enabled: true mtu: 1401 - name: eth2 description: Interface 2 (ADMIN DOWN) (WILL BE REVERTED) mtu: 1406 enabled: false state: merged - name: Assert that changes were applied assert: - that: "{{ round_trip['after'] | symmetric_difference(result['after']) |length == 0 }}" + that: round_trip['after'] | symmetric_difference(result['after']) |length == 0 - name: Revert back to base config using facts round trip register: revert vyos.vyos.vyos_interfaces: config: "{{ ansible_facts['network_resources']['interfaces'] }}" state: replaced - name: Assert that config was reverted assert: - that: "{{ base_config['after'] | symmetric_difference(revert['after']) |length == 0 }}" + that: base_config['after'] | symmetric_difference(revert['after']) |length == 0 always: - include_tasks: _remove_config.yaml diff --git a/tests/integration/targets/vyos_interfaces/vars/main.yaml b/tests/integration/targets/vyos_interfaces/vars/main.yaml index 4e66747b..c65771f2 100644 --- a/tests/integration/targets/vyos_interfaces/vars/main.yaml +++ b/tests/integration/targets/vyos_interfaces/vars/main.yaml @@ -1,174 +1,199 @@ --- merged: before: - name: eth0 enabled: true speed: auto duplex: auto - name: eth1 enabled: true - name: eth2 enabled: true commands: - set interfaces ethernet eth1 description 'Configured by Ansible - Interface 1' - set interfaces ethernet eth1 mtu '1500' + - set interfaces ethernet eth1 duplex 'auto' + - set interfaces ethernet eth1 speed 'auto' - set interfaces ethernet eth1 vif 100 description 'Eth1 - VIF 100' - set interfaces ethernet eth1 vif 100 mtu '1404' - set interfaces ethernet eth1 vif 101 description 'Eth1 - VIF 101' - set interfaces ethernet eth2 description 'Configured by Ansible - Interface 2 (ADMIN DOWN)' - set interfaces ethernet eth2 mtu '1406' - set interfaces ethernet eth2 disable after: - name: eth0 enabled: true duplex: auto speed: auto - name: eth1 description: Configured by Ansible - Interface 1 mtu: 1500 enabled: true + duplex: auto + speed: auto vifs: - vlan_id: 100 description: Eth1 - VIF 100 mtu: 1404 enabled: true - vlan_id: 101 description: Eth1 - VIF 101 enabled: true - name: eth2 description: Configured by Ansible - Interface 2 (ADMIN DOWN) mtu: 1406 enabled: false populate: - name: eth1 enabled: true description: Configured by Ansible mtu: 1500 + duplex: auto + speed: auto vifs: - vlan_id: 200 enabled: true description: VIF - 200 - name: eth2 enabled: true description: Configured by Ansible mtu: 1500 + duplex: auto + speed: auto vifs: - vlan_id: 200 enabled: true description: VIF - 200 - name: eth0 enabled: true duplex: auto speed: auto replaced: commands: - delete interfaces ethernet eth1 mtu + - delete interfaces ethernet eth1 speed + - delete interfaces ethernet eth1 duplex - delete interfaces ethernet eth1 vif 200 - set interfaces ethernet eth1 description 'Replaced by Ansible' - set interfaces ethernet eth1 vif 100 description 'VIF 100 - Replaced by Ansible' - delete interfaces ethernet eth2 vif 200 + - delete interfaces ethernet eth2 speed + - delete interfaces ethernet eth2 duplex - set interfaces ethernet eth2 description 'Replaced by Ansible' - set interfaces ethernet eth2 mtu '1400' after: - name: eth1 description: Replaced by Ansible enabled: true vifs: - vlan_id: 100 enabled: true description: VIF 100 - Replaced by Ansible - name: eth2 mtu: 1400 description: Replaced by Ansible enabled: true - name: eth0 enabled: true duplex: auto speed: auto parsed: after: - name: eth1 description: Configured by Ansible - Interface 1 mtu: 1500 enabled: true vifs: - vlan_id: 100 description: Eth1 - VIF 100 mtu: 1404 enabled: true - vlan_id: 101 description: Eth1 - VIF 101 enabled: true - name: eth2 description: Configured by Ansible - Interface 2 (ADMIN DOWN) mtu: 1406 enabled: false overridden: commands: - delete interfaces ethernet eth1 description - delete interfaces ethernet eth1 mtu + - delete interfaces ethernet eth1 speed + - delete interfaces ethernet eth1 duplex - delete interfaces ethernet eth1 vif 200 - delete interfaces ethernet eth2 vif 200 + - delete interfaces ethernet eth2 speed + - delete interfaces ethernet eth2 duplex - set interfaces ethernet eth2 description 'Overridden by Ansible' - set interfaces ethernet eth2 mtu '1402' after: - name: eth0 enabled: true speed: auto duplex: auto - name: eth1 enabled: true - name: eth2 enabled: true description: Overridden by Ansible mtu: 1402 rendered: commands: + - set interfaces ethernet eth0 duplex 'auto' + - set interfaces ethernet eth0 speed 'auto' + - set interfaces ethernet eth1 duplex 'auto' + - set interfaces ethernet eth1 speed 'auto' - set interfaces ethernet eth1 description 'Configured by Ansible - Interface 1' - set interfaces ethernet eth1 mtu '1500' - set interfaces ethernet eth1 vif 100 description 'Eth1 - VIF 100' - set interfaces ethernet eth1 vif 100 mtu '1404' - set interfaces ethernet eth1 vif 101 description 'Eth1 - VIF 101' - set interfaces ethernet eth2 description 'Configured by Ansible - Interface 2 (ADMIN DOWN)' - set interfaces ethernet eth2 mtu '1406' deleted: commands: - delete interfaces ethernet eth1 description - delete interfaces ethernet eth1 mtu - delete interfaces ethernet eth1 vif 200 + - delete interfaces ethernet eth1 speed + - delete interfaces ethernet eth1 duplex - delete interfaces ethernet eth2 description - delete interfaces ethernet eth2 mtu - delete interfaces ethernet eth2 vif 200 + - delete interfaces ethernet eth2 speed + - delete interfaces ethernet eth2 duplex after: - name: eth0 enabled: true speed: auto duplex: auto - name: eth1 enabled: true - name: eth2 enabled: true + round_trip: after: - name: eth0 enabled: true speed: auto duplex: auto - name: eth1 description: Interface 1 - Description (WILL BE REVERTED) enabled: true mtu: 1412 vifs: - vlan_id: 100 description: Eth1 - VIF 100 (WILL BE REVERTED) mtu: 1404 enabled: true - vlan_id: 101 description: Eth1 - VIF 101 (WILL BE REMOVED) enabled: true mtu: 1401 - name: eth2 description: Interface 2 (ADMIN DOWN) (WILL BE REVERTED) mtu: 1406 enabled: false diff --git a/tests/unit/modules/network/vyos/fixtures/vyos_firewall_global_config.cfg b/tests/unit/modules/network/vyos/fixtures/vyos_firewall_global_config.cfg index f54a03dc..464f132f 100644 --- a/tests/unit/modules/network/vyos/fixtures/vyos_firewall_global_config.cfg +++ b/tests/unit/modules/network/vyos/fixtures/vyos_firewall_global_config.cfg @@ -1,15 +1,17 @@ set firewall group address-group RND-HOSTS address 192.0.2.1 set firewall group address-group RND-HOSTS address 192.0.2.3 set firewall group address-group RND-HOSTS address 192.0.2.5 set firewall group address-group RND-HOSTS description 'This group has the Management hosts address lists' +set firewall group address-group DELETE-HOSTS address 1.2.3.4 +set firewall group address-group DELETE-HOSTS description 'The (single) last address from this group will be deleted in the tests' set firewall group ipv6-address-group LOCAL-v6 address ::1 set firewall group ipv6-address-group LOCAL-v6 address fdec:2503:89d6:59b3::1 set firewall group ipv6-address-group LOCAL-v6 description 'This group has the hosts address lists of this machine' set firewall group network-group RND network 192.0.2.0/24 set firewall group network-group RND description 'This group has the Management network addresses' set firewall group ipv6-network-group UNIQUE-LOCAL-v6 network fc00::/7 set firewall group ipv6-network-group UNIQUE-LOCAL-v6 description 'This group encompasses the ULA address space in IPv6' set firewall group port-group SSH port 22 set firewall group port-group SSH description 'This group has the ssh ports' set firewall ipv6-src-route 'enable' set firewall send-redirects 'enable' diff --git a/tests/unit/modules/network/vyos/fixtures/vyos_firewall_global_config_v14.cfg b/tests/unit/modules/network/vyos/fixtures/vyos_firewall_global_config_v14.cfg index 0a1247dd..ad60b45c 100644 --- a/tests/unit/modules/network/vyos/fixtures/vyos_firewall_global_config_v14.cfg +++ b/tests/unit/modules/network/vyos/fixtures/vyos_firewall_global_config_v14.cfg @@ -1,18 +1,20 @@ set firewall group address-group RND-HOSTS address 192.0.2.1 set firewall group address-group RND-HOSTS address 192.0.2.3 set firewall group address-group RND-HOSTS address 192.0.2.5 set firewall group address-group RND-HOSTS description 'This group has the Management hosts address lists' +set firewall group address-group DELETE-HOSTS address 1.2.3.4 +set firewall group address-group DELETE-HOSTS description 'The (single) last address from this group will be deleted in the tests' set firewall group ipv6-address-group LOCAL-v6 address ::1 set firewall group ipv6-address-group LOCAL-v6 address fdec:2503:89d6:59b3::1 set firewall group ipv6-address-group LOCAL-v6 description 'This group has the hosts address lists of this machine' set firewall group network-group RND network 192.0.2.0/24 set firewall group network-group RND description 'This group has the Management network addresses' set firewall group ipv6-network-group UNIQUE-LOCAL-v6 network fc00::/7 set firewall group ipv6-network-group UNIQUE-LOCAL-v6 description 'This group encompasses the ULA address space in IPv6' set firewall group port-group SSH port 22 set firewall group port-group SSH description 'This group has the ssh ports' set firewall global-options all-ping enable set firewall global-options state-policy related action 'accept' set firewall global-options state-policy related log-level 'alert' set firewall global-options ipv6-src-route 'enable' set firewall global-options send-redirects 'enable' diff --git a/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config.cfg b/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config.cfg index 6c248d2b..3ad6ec97 100644 --- a/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config.cfg +++ b/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config.cfg @@ -1,16 +1,21 @@ set firewall name V4-INGRESS default-action 'accept' set firewall ipv6-name V6-INGRESS default-action 'accept' set firewall name V4-INGRESS description 'This is IPv4 V4-INGRESS rule set' set firewall name V4-INGRESS enable-default-log set firewall name V4-INGRESS rule 101 protocol 'icmp' set firewall name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible' set firewall name V4-INGRESS rule 101 fragment 'match-frag' set firewall name V4-INGRESS rule 101 set firewall name V4-INGRESS rule 101 disable set firewall name V4-INGRESS rule 101 action 'accept' set firewall name V4-INGRESS rule 101 ipsec 'match-ipsec' set firewall name V4-INGRESS rule 101 log 'enable' set firewall name EGRESS default-action 'reject' set firewall ipv6-name EGRESS default-action 'reject' set firewall ipv6-name EGRESS rule 20 set firewall ipv6-name EGRESS rule 20 icmpv6 type 'echo-request' +set firewall name MULTIPLE-RULE default-action 'drop' +set firewall name MULTIPLE-RULE rule 1 action 'accept' +set firewall name MULTIPLE-RULE rule 1 protocol 'all' +set firewall name MULTIPLE-RULE rule 2 action 'drop' +set firewall name MULTIPLE-RULE rule 2 protocol 'all' \ No newline at end of file diff --git a/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config_v14.cfg b/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config_v14.cfg index e82e3903..7f63dd78 100644 --- a/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config_v14.cfg +++ b/tests/unit/modules/network/vyos/fixtures/vyos_firewall_rules_config_v14.cfg @@ -1,34 +1,39 @@ set firewall ipv4 name V4-INGRESS default-action 'accept' set firewall ipv6 name V6-INGRESS default-action 'accept' set firewall ipv4 name V4-INGRESS description 'This is IPv4 V4-INGRESS rule set' set firewall ipv4 name V4-INGRESS default-log set firewall ipv4 name V4-INGRESS rule 101 protocol 'icmp' set firewall ipv4 name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible' set firewall ipv4 name V4-INGRESS rule 101 packet-length-exclude 100 set firewall ipv4 name V4-INGRESS rule 101 packet-length-exclude 300 set firewall ipv4 name V4-INGRESS rule 101 log set firewall ipv4 name V4-INGRESS rule 101 set firewall ipv4 name V4-INGRESS rule 101 disable set firewall ipv4 name V4-INGRESS rule 101 action 'accept' set firewall ipv4 name EGRESS default-action 'reject' set firewall ipv6 name EGRESS default-action 'reject' set firewall ipv6 name EGRESS rule 20 set firewall ipv6 name EGRESS rule 20 icmpv6 type-name 'echo-request' set firewall ipv6 input filter rule 1 set firewall ipv6 input filter rule 1 action 'jump' set firewall ipv6 input filter rule 1 jump-target 'V6-INGRESS' set firewall ipv6 output filter rule 1 set firewall ipv6 output filter rule 1 action 'jump' set firewall ipv6 output filter rule 1 jump-target 'EGRESS' set firewall ipv4 input filter rule 1 set firewall ipv4 input filter rule 1 action 'jump' set firewall ipv4 input filter rule 1 jump-target 'INGRESS' set firewall ipv4 output filter rule 1 set firewall ipv4 output filter rule 1 action 'jump' set firewall ipv4 output filter rule 1 jump-target 'EGRESS' set firewall ipv4 name IF-TEST rule 10 disable set firewall ipv4 name IF-TEST rule 10 action 'accept' set firewall ipv4 name IF-TEST rule 10 inbound-interface name 'eth0' set firewall ipv4 name IF-TEST rule 10 outbound-interface group 'the-ethers' set firewall ipv4 name IF-TEST rule 10 icmp type-name 'echo-request' set firewall ipv4 name IF-TEST rule 10 state 'related' +set firewall ipv4 name MULTIPLE-RULE default-action 'drop' +set firewall ipv4 name MULTIPLE-RULE rule 1 action 'accept' +set firewall ipv4 name MULTIPLE-RULE rule 1 protocol 'all' +set firewall ipv4 name MULTIPLE-RULE rule 2 action 'drop' +set firewall ipv4 name MULTIPLE-RULE rule 2 protocol 'all' \ No newline at end of file diff --git a/tests/unit/modules/network/vyos/test_vyos_firewall_global.py b/tests/unit/modules/network/vyos/test_vyos_firewall_global.py index 481cc1dd..db67ab2c 100644 --- a/tests/unit/modules/network/vyos/test_vyos_firewall_global.py +++ b/tests/unit/modules/network/vyos/test_vyos_firewall_global.py @@ -1,455 +1,471 @@ # (c) 2016 Red Hat Inc. # # This file is part of Ansible # # Ansible is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Ansible is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Ansible. If not, see . # Make coding more python3-ish from __future__ import absolute_import, division, print_function __metaclass__ = type from unittest.mock import patch from ansible_collections.vyos.vyos.plugins.modules import vyos_firewall_global from ansible_collections.vyos.vyos.tests.unit.modules.utils import set_module_args from .vyos_module import TestVyosModule, load_fixture class TestVyosFirewallGlobalModule(TestVyosModule): module = vyos_firewall_global def setUp(self): super(TestVyosFirewallGlobalModule, self).setUp() self.mock_get_config = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.get_config", ) self.get_config = self.mock_get_config.start() self.mock_load_config = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.load_config", ) self.load_config = self.mock_load_config.start() self.mock_get_resource_connection_config = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base.get_resource_connection", ) self.get_resource_connection_config = self.mock_get_resource_connection_config.start() self.mock_get_resource_connection_facts = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.facts.facts.get_resource_connection", ) self.get_resource_connection_facts = self.mock_get_resource_connection_facts.start() self.mock_execute_show_command = patch( "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.firewall_global.firewall_global.Firewall_globalFacts.get_device_data", ) self.mock_get_os_version = patch( "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.config.firewall_global.firewall_global.get_os_version", ) self.get_os_version = self.mock_get_os_version.start() self.get_os_version.return_value = "1.3" self.execute_show_command = self.mock_execute_show_command.start() self.maxDiff = None def tearDown(self): super(TestVyosFirewallGlobalModule, self).tearDown() self.mock_get_resource_connection_config.stop() self.mock_get_resource_connection_facts.stop() self.mock_get_config.stop() self.mock_load_config.stop() self.mock_execute_show_command.stop() self.mock_get_os_version.stop() def load_fixtures(self, commands=None, filename=None): def load_from_file(*args, **kwargs): return load_fixture("vyos_firewall_global_config.cfg") self.execute_show_command.side_effect = load_from_file def test_vyos_firewall_global_set_01_merged(self): set_module_args( dict( config=dict( validation="strict", config_trap=True, log_martians=True, syn_cookies=True, twa_hazards_protection=True, ping=dict(all=True, broadcast=True), state_policy=[ dict( connection_type="established", action="accept", log=True, log_level="emerg", ), dict(connection_type="invalid", action="reject"), ], route_redirects=[ dict( afi="ipv4", ip_src_route=True, icmp_redirects=dict(send=True, receive=False), ), dict( afi="ipv6", ip_src_route=True, icmp_redirects=dict(receive=False), ), ], group=dict( address_group=[ dict( afi="ipv4", name="MGMT-HOSTS", description="This group has the Management hosts address lists", members=[ dict(address="192.0.1.1"), dict(address="192.0.1.3"), dict(address="192.0.1.5"), ], ), dict( afi="ipv6", name="GOOGLE-DNS-v6", members=[ dict(address="2001:4860:4860::8888"), dict(address="2001:4860:4860::8844"), ], ), ], network_group=[ dict( afi="ipv4", name="MGMT", description="This group has the Management network addresses", members=[dict(address="192.0.1.0/24")], ), dict( afi="ipv6", name="DOCUMENTATION-v6", description="IPv6 Addresses reserved for documentation per RFC 3849", members=[ dict(address="2001:0DB8::/32"), dict(address="3FFF:FFFF::/32"), ], ), ], port_group=[ dict( name="TELNET", description="This group has the telnet ports", members=[dict(port="23")], ), ], ), ), state="merged", ), ) commands = [ "set firewall group address-group MGMT-HOSTS address 192.0.1.1", "set firewall group address-group MGMT-HOSTS address 192.0.1.3", "set firewall group address-group MGMT-HOSTS address 192.0.1.5", "set firewall group address-group MGMT-HOSTS description 'This group has the Management hosts address lists'", "set firewall group address-group MGMT-HOSTS", "set firewall group ipv6-address-group GOOGLE-DNS-v6 address 2001:4860:4860::8888", "set firewall group ipv6-address-group GOOGLE-DNS-v6 address 2001:4860:4860::8844", "set firewall group ipv6-address-group GOOGLE-DNS-v6", "set firewall group network-group MGMT network 192.0.1.0/24", "set firewall group network-group MGMT description 'This group has the Management network addresses'", "set firewall group network-group MGMT", "set firewall group ipv6-network-group DOCUMENTATION-v6 network 2001:0DB8::/32", "set firewall group ipv6-network-group DOCUMENTATION-v6 network 3FFF:FFFF::/32", "set firewall group ipv6-network-group DOCUMENTATION-v6 description 'IPv6 Addresses reserved for documentation per RFC 3849'", "set firewall group ipv6-network-group DOCUMENTATION-v6", "set firewall group port-group TELNET port 23", "set firewall group port-group TELNET description 'This group has the telnet ports'", "set firewall group port-group TELNET", "set firewall ip-src-route 'enable'", "set firewall receive-redirects 'disable'", "set firewall config-trap 'enable'", "set firewall ipv6-receive-redirects 'disable'", "set firewall state-policy established action 'accept'", "set firewall state-policy established log 'enable'", "set firewall state-policy invalid action 'reject'", "set firewall broadcast-ping 'enable'", "set firewall all-ping 'enable'", "set firewall log-martians 'enable'", "set firewall twa-hazards-protection 'enable'", "set firewall syn-cookies 'enable'", "set firewall source-validation 'strict'", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_global_set_01_merged_idem(self): set_module_args( dict( config=dict( group=dict( address_group=[ dict( afi="ipv4", name="RND-HOSTS", description="This group has the Management hosts address lists", members=[ dict(address="192.0.2.1"), dict(address="192.0.2.3"), dict(address="192.0.2.5"), ], ), dict( afi="ipv6", name="LOCAL-v6", description="This group has the hosts address lists of this machine", members=[ dict(address="::1"), dict(address="fdec:2503:89d6:59b3::1"), ], ), ], network_group=[ dict( afi="ipv4", name="RND", description="This group has the Management network addresses", members=[dict(address="192.0.2.0/24")], ), dict( afi="ipv6", name="UNIQUE-LOCAL-v6", description="This group encompasses the ULA address space in IPv6", members=[dict(address="fc00::/7")], ), ], port_group=[ dict( name="SSH", description="This group has the ssh ports", members=[dict(port="22")], ), ], ), ), state="merged", ), ) self.execute_module(changed=False, commands=[]) def test_vyos_firewall_global_set_01_replaced(self): set_module_args( dict( config=dict( group=dict( address_group=[ dict( afi="ipv4", name="RND-HOSTS", description="This group has the Management hosts address lists", members=[ dict(address="192.0.2.1"), dict(address="192.0.2.7"), dict(address="192.0.2.9"), ], ), + dict( + afi="ipv4", + name="DELETE-HOSTS", + description="The (single) last address from this group will be deleted in the tests", + # No members here + ), dict( afi="ipv6", name="LOCAL-v6", description="This group has the hosts address lists of this machine", members=[ dict(address="::1"), dict(address="fdec:2503:89d6:59b3::2"), ], ), ], network_group=[ dict( afi="ipv4", name="RND", # Deleted the description here. members=[dict(address="192.0.2.0/24")], ), dict( afi="ipv6", name="UNIQUE-LOCAL-v6", description="This group encompasses the ULA address space in IPv6", members=[dict(address="fc00::/7")], ), ], port_group=[ dict( name="SSH", description="This group has the ssh ports", members=[dict(port="2222")], ), ], ), ), state="replaced", ), ) commands = [ "delete firewall ipv6-src-route", "delete firewall send-redirects", "delete firewall group address-group RND-HOSTS address 192.0.2.3", "delete firewall group address-group RND-HOSTS address 192.0.2.5", + "delete firewall group address-group DELETE-HOSTS address", "set firewall group address-group RND-HOSTS address 192.0.2.7", "set firewall group address-group RND-HOSTS address 192.0.2.9", "delete firewall group network-group RND description", "delete firewall group ipv6-address-group LOCAL-v6 address fdec:2503:89d6:59b3::1", "set firewall group ipv6-address-group LOCAL-v6 address fdec:2503:89d6:59b3::2", "delete firewall group port-group SSH port 22", "set firewall group port-group SSH port 2222", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_global_set_02_replaced(self): set_module_args( dict( config=dict( state_policy=[ dict(connection_type="invalid", action="reject"), dict(connection_type="related", action="drop"), ], group=dict( address_group=[ dict( afi="ipv4", name="RND-HOSTS", description="This group has the Management hosts address lists", members=[ dict(address="192.0.2.1"), dict(address="192.0.2.7"), dict(address="192.0.2.9"), ], ), dict( afi="ipv6", name="LOCAL-v6", description="This group has the hosts address lists of this machine", members=[ dict(address="::1"), dict(address="fdec:2503:89d6:59b3::2"), ], ), ], network_group=[ dict( afi="ipv4", name="RND", description="This group has the Management network addresses", members=[dict(address="192.0.2.0/24")], ), dict( afi="ipv6", name="UNIQUE-LOCAL-v6", description="This group encompasses the ULA address space in IPv6", members=[dict(address="fc00::/7")], ), ], port_group=[ dict( name="SSH", description="This group has the ssh ports", members=[dict(port="2222")], ), ], ), ), state="replaced", ), ) commands = [ + "delete firewall group address-group DELETE-HOSTS", "delete firewall group address-group RND-HOSTS address 192.0.2.3", "delete firewall group address-group RND-HOSTS address 192.0.2.5", "delete firewall ipv6-src-route", "delete firewall send-redirects", "set firewall state-policy related action 'drop'", "set firewall state-policy invalid action 'reject'", "set firewall group address-group RND-HOSTS address 192.0.2.7", "set firewall group address-group RND-HOSTS address 192.0.2.9", "delete firewall group ipv6-address-group LOCAL-v6 address fdec:2503:89d6:59b3::1", "set firewall group ipv6-address-group LOCAL-v6 address fdec:2503:89d6:59b3::2", "delete firewall group port-group SSH port 22", "set firewall group port-group SSH port 2222", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_global_set_01_replaced_idem(self): set_module_args( dict( config=dict( route_redirects=[ dict(ip_src_route=True, afi="ipv6"), dict(icmp_redirects=dict(send=True), afi="ipv4"), ], group=dict( address_group=[ dict( afi="ipv4", name="RND-HOSTS", description="This group has the Management hosts address lists", members=[ dict(address="192.0.2.1"), dict(address="192.0.2.3"), dict(address="192.0.2.5"), ], ), + dict( + afi="ipv4", + name="DELETE-HOSTS", + description="The (single) last address from this group will be deleted in the tests", + members=[ + dict(address='1.2.3.4'), + ] + ), dict( afi="ipv6", name="LOCAL-v6", description="This group has the hosts address lists of this machine", members=[ dict(address="::1"), dict(address="fdec:2503:89d6:59b3::1"), ], ), ], network_group=[ dict( afi="ipv4", name="RND", description="This group has the Management network addresses", members=[dict(address="192.0.2.0/24")], ), dict( afi="ipv6", name="UNIQUE-LOCAL-v6", description="This group encompasses the ULA address space in IPv6", members=[dict(address="fc00::/7")], ), ], port_group=[ dict( name="SSH", description="This group has the ssh ports", members=[dict(port="22")], ), ], ), ), state="replaced", ), ) self.execute_module(changed=False, commands=[]) def test_vyos_firewall_global_set_01_deleted(self): set_module_args(dict(config=dict(), state="deleted")) commands = ["delete firewall"] self.execute_module(changed=True, commands=commands) diff --git a/tests/unit/modules/network/vyos/test_vyos_firewall_global14.py b/tests/unit/modules/network/vyos/test_vyos_firewall_global14.py index aae4aa83..0b85e62d 100644 --- a/tests/unit/modules/network/vyos/test_vyos_firewall_global14.py +++ b/tests/unit/modules/network/vyos/test_vyos_firewall_global14.py @@ -1,467 +1,483 @@ # (c) 2016 Red Hat Inc. # # This file is part of Ansible # # Ansible is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Ansible is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Ansible. If not, see . # Make coding more python3-ish from __future__ import absolute_import, division, print_function __metaclass__ = type from unittest.mock import patch from ansible_collections.vyos.vyos.plugins.modules import vyos_firewall_global from ansible_collections.vyos.vyos.tests.unit.modules.utils import set_module_args from .vyos_module import TestVyosModule, load_fixture class TestVyosFirewallRulesModule14(TestVyosModule): module = vyos_firewall_global def setUp(self): super(TestVyosFirewallRulesModule14, self).setUp() self.mock_get_config = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.get_config", ) self.get_config = self.mock_get_config.start() self.mock_load_config = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.load_config", ) self.load_config = self.mock_load_config.start() self.mock_get_resource_connection_config = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base.get_resource_connection", ) self.get_resource_connection_config = self.mock_get_resource_connection_config.start() self.mock_get_resource_connection_facts = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.facts.facts.get_resource_connection", ) self.get_resource_connection_facts = self.mock_get_resource_connection_facts.start() self.mock_execute_show_command = patch( "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.firewall_global.firewall_global.Firewall_globalFacts.get_device_data", ) self.mock_get_os_version = patch( "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.config.firewall_global.firewall_global.get_os_version", ) self.get_os_version = self.mock_get_os_version.start() self.get_os_version.return_value = "1.4" self.execute_show_command = self.mock_execute_show_command.start() self.maxDiff = None def tearDown(self): super(TestVyosFirewallRulesModule14, self).tearDown() self.mock_get_resource_connection_config.stop() self.mock_get_resource_connection_facts.stop() self.mock_get_config.stop() self.mock_load_config.stop() self.mock_execute_show_command.stop() self.mock_get_os_version.stop() def load_fixtures(self, commands=None, filename=None): def load_from_file(*args, **kwargs): return load_fixture("vyos_firewall_global_config_v14.cfg") self.execute_show_command.side_effect = load_from_file def test_vyos_firewall_global_set_01_merged(self): set_module_args( dict( config=dict( validation="strict", config_trap=True, log_martians=True, syn_cookies=True, twa_hazards_protection=True, ping=dict(all=True, broadcast=True), state_policy=[ dict( connection_type="established", action="accept", log=True, log_level="emerg", ), dict(connection_type="invalid", action="reject"), ], route_redirects=[ dict( afi="ipv4", ip_src_route=True, icmp_redirects=dict(send=True, receive=False), ), dict( afi="ipv6", ip_src_route=True, icmp_redirects=dict(receive=False), ), ], group=dict( address_group=[ dict( afi="ipv4", name="MGMT-HOSTS", description="This group has the Management hosts address lists", members=[ dict(address="192.0.1.1"), dict(address="192.0.1.3"), dict(address="192.0.1.5"), ], ), dict( afi="ipv6", name="GOOGLE-DNS-v6", members=[ dict(address="2001:4860:4860::8888"), dict(address="2001:4860:4860::8844"), ], ), ], network_group=[ dict( afi="ipv4", name="MGMT", description="This group has the Management network addresses", members=[dict(address="192.0.1.0/24")], ), dict( afi="ipv6", name="DOCUMENTATION-v6", description="IPv6 Addresses reserved for documentation per RFC 3849", members=[ dict(address="2001:0DB8::/32"), dict(address="3FFF:FFFF::/32"), ], ), ], port_group=[ dict( name="TELNET", description="This group has the telnet ports", members=[dict(port="23")], ), ], ), ), state="merged", ), ) commands = [ "set firewall group address-group MGMT-HOSTS address 192.0.1.1", "set firewall group address-group MGMT-HOSTS address 192.0.1.3", "set firewall group address-group MGMT-HOSTS address 192.0.1.5", "set firewall group address-group MGMT-HOSTS description 'This group has the Management hosts address lists'", "set firewall group address-group MGMT-HOSTS", "set firewall group ipv6-address-group GOOGLE-DNS-v6 address 2001:4860:4860::8888", "set firewall group ipv6-address-group GOOGLE-DNS-v6 address 2001:4860:4860::8844", "set firewall group ipv6-address-group GOOGLE-DNS-v6", "set firewall group network-group MGMT network 192.0.1.0/24", "set firewall group network-group MGMT description 'This group has the Management network addresses'", "set firewall group network-group MGMT", "set firewall group ipv6-network-group DOCUMENTATION-v6 network 2001:0DB8::/32", "set firewall group ipv6-network-group DOCUMENTATION-v6 network 3FFF:FFFF::/32", "set firewall group ipv6-network-group DOCUMENTATION-v6 description 'IPv6 Addresses reserved for documentation per RFC 3849'", "set firewall group ipv6-network-group DOCUMENTATION-v6", "set firewall group port-group TELNET port 23", "set firewall group port-group TELNET description 'This group has the telnet ports'", "set firewall group port-group TELNET", "set firewall global-options ip-src-route 'enable'", "set firewall global-options receive-redirects 'disable'", "set firewall global-options config-trap 'enable'", "set firewall global-options ipv6-receive-redirects 'disable'", "set firewall global-options state-policy established action 'accept'", "set firewall global-options state-policy established log", "set firewall global-options state-policy established log-level 'emerg'", "set firewall global-options state-policy invalid action 'reject'", "set firewall global-options broadcast-ping 'enable'", "set firewall global-options log-martians 'enable'", "set firewall global-options twa-hazards-protection 'enable'", "set firewall global-options syn-cookies 'enable'", "set firewall global-options source-validation 'strict'", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_global_set_01_merged_idem(self): set_module_args( dict( config=dict( group=dict( address_group=[ dict( afi="ipv4", name="RND-HOSTS", description="This group has the Management hosts address lists", members=[ dict(address="192.0.2.1"), dict(address="192.0.2.3"), dict(address="192.0.2.5"), ], ), dict( afi="ipv6", name="LOCAL-v6", description="This group has the hosts address lists of this machine", members=[ dict(address="::1"), dict(address="fdec:2503:89d6:59b3::1"), ], ), ], network_group=[ dict( afi="ipv4", name="RND", description="This group has the Management network addresses", members=[dict(address="192.0.2.0/24")], ), dict( afi="ipv6", name="UNIQUE-LOCAL-v6", description="This group encompasses the ULA address space in IPv6", members=[dict(address="fc00::/7")], ), ], port_group=[ dict( name="SSH", description="This group has the ssh ports", members=[dict(port="22")], ), ], ), ), state="merged", ), ) self.execute_module(changed=False, commands=[]) def test_vyos_firewall_global_set_01_replaced(self): set_module_args( dict( config=dict( state_policy=[ dict(connection_type="invalid", action="reject"), ], group=dict( address_group=[ dict( afi="ipv4", name="RND-HOSTS", description="This group has the Management hosts address lists", members=[ dict(address="192.0.2.1"), dict(address="192.0.2.7"), dict(address="192.0.2.9"), ], ), + dict( + afi="ipv4", + name="DELETE-HOSTS", + description="The (single) last address from this group will be deleted in the tests", + # No members here + ), dict( afi="ipv6", name="LOCAL-v6", description="This group has the hosts address lists of this machine", members=[ dict(address="::1"), dict(address="fdec:2503:89d6:59b3::2"), ], ), ], network_group=[ dict( afi="ipv4", name="RND", # Deleted the description here. members=[dict(address="192.0.2.0/24")], ), dict( afi="ipv6", name="UNIQUE-LOCAL-v6", description="This group encompasses the ULA address space in IPv6", members=[dict(address="fc00::/7")], ), ], port_group=[ dict( name="SSH", description="This group has the ssh ports", members=[dict(port="2222")], ), ], ), ), state="replaced", ), ) commands = [ "delete firewall group address-group RND-HOSTS address 192.0.2.3", "delete firewall group address-group RND-HOSTS address 192.0.2.5", + "delete firewall group address-group DELETE-HOSTS address", "delete firewall global-options all-ping", "delete firewall global-options state-policy related", "delete firewall global-options ipv6-src-route", "delete firewall global-options send-redirects", "set firewall global-options state-policy invalid action 'reject'", "set firewall group address-group RND-HOSTS address 192.0.2.7", "set firewall group address-group RND-HOSTS address 192.0.2.9", "delete firewall group network-group RND description", "delete firewall group ipv6-address-group LOCAL-v6 address fdec:2503:89d6:59b3::1", "set firewall group ipv6-address-group LOCAL-v6 address fdec:2503:89d6:59b3::2", "delete firewall group port-group SSH port 22", "set firewall group port-group SSH port 2222", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_global_set_01_replaced_idem(self): set_module_args( dict( config=dict( ping=dict(all=True), route_redirects=[ dict(ip_src_route=True, afi="ipv6"), dict(icmp_redirects=dict(send=True), afi="ipv4"), ], state_policy=[ dict(connection_type="related", action="accept", log_level="alert"), ], group=dict( address_group=[ dict( afi="ipv4", name="RND-HOSTS", description="This group has the Management hosts address lists", members=[ dict(address="192.0.2.1"), dict(address="192.0.2.3"), dict(address="192.0.2.5"), ], ), + dict( + afi="ipv4", + name="DELETE-HOSTS", + description="The (single) last address from this group will be deleted in the tests", + members=[ + dict(address='1.2.3.4'), + ] + ), dict( afi="ipv6", name="LOCAL-v6", description="This group has the hosts address lists of this machine", members=[ dict(address="::1"), dict(address="fdec:2503:89d6:59b3::1"), ], ), ], network_group=[ dict( afi="ipv4", name="RND", description="This group has the Management network addresses", members=[dict(address="192.0.2.0/24")], ), dict( afi="ipv6", name="UNIQUE-LOCAL-v6", description="This group encompasses the ULA address space in IPv6", members=[dict(address="fc00::/7")], ), ], port_group=[ dict( name="SSH", description="This group has the ssh ports", members=[dict(port="22")], ), ], ), ), state="replaced", ), ) self.execute_module(changed=False, commands=[]) def test_vyos_firewall_global_set_02_replaced(self): set_module_args( dict( config=dict( state_policy=[ dict(connection_type="invalid", action="reject"), dict(connection_type="related", action="drop"), ], group=dict( address_group=[ dict( afi="ipv4", name="RND-HOSTS", description="This group has the Management hosts address lists", members=[ dict(address="192.0.2.1"), dict(address="192.0.2.7"), dict(address="192.0.2.9"), ], ), dict( afi="ipv6", name="LOCAL-v6", description="This group has the hosts address lists of this machine", members=[ dict(address="::1"), dict(address="fdec:2503:89d6:59b3::2"), ], ), ], network_group=[ dict( afi="ipv4", name="RND", description="This group has the Management network addresses", members=[dict(address="192.0.2.0/24")], ), dict( afi="ipv6", name="UNIQUE-LOCAL-v6", description="This group encompasses the ULA address space in IPv6", members=[dict(address="fc00::/7")], ), ], port_group=[ dict( name="SSH", description="This group has the ssh ports", members=[dict(port="2222")], ), ], ), ), state="replaced", ), ) commands = [ "delete firewall group address-group RND-HOSTS address 192.0.2.3", "delete firewall group address-group RND-HOSTS address 192.0.2.5", "delete firewall global-options all-ping", "delete firewall global-options ipv6-src-route", "delete firewall global-options send-redirects", "set firewall global-options state-policy related action 'drop'", "delete firewall global-options state-policy related log-level", + "delete firewall group address-group DELETE-HOSTS", "set firewall global-options state-policy invalid action 'reject'", "set firewall group address-group RND-HOSTS address 192.0.2.7", "set firewall group address-group RND-HOSTS address 192.0.2.9", "delete firewall group ipv6-address-group LOCAL-v6 address fdec:2503:89d6:59b3::1", "set firewall group ipv6-address-group LOCAL-v6 address fdec:2503:89d6:59b3::2", "delete firewall group port-group SSH port 22", "set firewall group port-group SSH port 2222", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_global_set_01_deleted(self): set_module_args(dict(config=dict(), state="deleted")) commands = ["delete firewall"] self.execute_module(changed=True, commands=commands) diff --git a/tests/unit/modules/network/vyos/test_vyos_firewall_rules13.py b/tests/unit/modules/network/vyos/test_vyos_firewall_rules13.py index 101f389e..9a25198f 100644 --- a/tests/unit/modules/network/vyos/test_vyos_firewall_rules13.py +++ b/tests/unit/modules/network/vyos/test_vyos_firewall_rules13.py @@ -1,1439 +1,1472 @@ # (c) 2016 Red Hat Inc. # # This file is part of Ansible # # Ansible is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Ansible is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Ansible. If not, see . # Make coding more python3-ish from __future__ import absolute_import, division, print_function __metaclass__ = type from unittest.mock import patch from ansible_collections.vyos.vyos.plugins.modules import vyos_firewall_rules from ansible_collections.vyos.vyos.tests.unit.modules.utils import set_module_args from .vyos_module import TestVyosModule, load_fixture class TestVyosFirewallRulesModule13(TestVyosModule): module = vyos_firewall_rules def setUp(self): super(TestVyosFirewallRulesModule13, self).setUp() self.mock_get_config = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.get_config", ) self.get_config = self.mock_get_config.start() self.mock_load_config = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.load_config", ) self.load_config = self.mock_load_config.start() self.mock_get_resource_connection_config = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base.get_resource_connection", ) self.get_resource_connection_config = self.mock_get_resource_connection_config.start() self.mock_get_resource_connection_facts = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.facts.facts.get_resource_connection", ) self.get_resource_connection_facts = self.mock_get_resource_connection_facts.start() self.mock_execute_show_command = patch( "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.static_routes.static_routes.Static_routesFacts.get_device_data", ) self.mock_execute_show_command = patch( "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.firewall_rules.firewall_rules.Firewall_rulesFacts.get_device_data", ) self.execute_show_command = self.mock_execute_show_command.start() self.mock_get_os_version = patch( "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.config.firewall_rules.firewall_rules.get_os_version", ) self.get_os_version = self.mock_get_os_version.start() self.get_os_version.return_value = "1.2" def tearDown(self): super(TestVyosFirewallRulesModule13, self).tearDown() self.mock_get_resource_connection_config.stop() self.mock_get_resource_connection_facts.stop() self.mock_get_config.stop() self.mock_load_config.stop() self.mock_execute_show_command.stop() self.mock_get_os_version.stop() def load_fixtures(self, commands=None, filename=None): def load_from_file(*args, **kwargs): return load_fixture("vyos_firewall_rules_config.cfg") self.execute_show_command.side_effect = load_from_file def test_vyos_firewall_rule_set_01_merged(self): set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="V6-INBOUND", description="This is IPv6 INBOUND rule set", default_action="reject", enable_default_log=True, rules=[], ), dict( name="V6-OUTBOUND", description="This is IPv6 OUTBOUND rule set", default_action="accept", enable_default_log=False, rules=[], ), ], ), dict( afi="ipv4", rule_sets=[ dict( name="V4-INBOUND", description="This is IPv4 INBOUND rule set", default_action="reject", enable_default_log=True, rules=[], ), dict( name="V4-OUTBOUND", description="This is IPv4 OUTBOUND rule set", default_action="accept", enable_default_log=False, rules=[], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv6-name V6-INBOUND default-action 'reject'", "set firewall ipv6-name V6-INBOUND description 'This is IPv6 INBOUND rule set'", "set firewall ipv6-name V6-INBOUND enable-default-log", "set firewall ipv6-name V6-OUTBOUND default-action 'accept'", "set firewall ipv6-name V6-OUTBOUND description 'This is IPv6 OUTBOUND rule set'", "set firewall name V4-INBOUND default-action 'reject'", "set firewall name V4-INBOUND description 'This is IPv4 INBOUND rule set'", "set firewall name V4-INBOUND enable-default-log", "set firewall name V4-OUTBOUND default-action 'accept'", "set firewall name V4-OUTBOUND description 'This is IPv4 OUTBOUND rule set'", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_rule_merged_01(self): """Test if plugin correctly adds new rules set and a rule with variant attributes""" set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="INBOUND", description="This is IPv4 INBOUND rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", log="disable", protocol="icmp", fragment="match-frag", disable=True, ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall name INBOUND default-action 'accept'", "set firewall name INBOUND description 'This is IPv4 INBOUND rule set'", "set firewall name INBOUND enable-default-log", "set firewall name INBOUND rule 101 protocol 'icmp'", "set firewall name INBOUND rule 101 description 'Rule 101 is configured by Ansible'", "set firewall name INBOUND rule 101 fragment 'match-frag'", "set firewall name INBOUND rule 101", "set firewall name INBOUND rule 101 disable", "set firewall name INBOUND rule 101 action 'accept'", "set firewall name INBOUND rule 101 ipsec 'match-ipsec'", "set firewall name INBOUND rule 101 log 'disable'", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_rule_merged_02(self): """Test if plugin correctly adds new rules with variant attributes within existing rule set """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", protocol="tcp", source=dict( address="192.0.2.0", mac_address="38:00:25:19:76:0c", port=2127, ), destination=dict(address="192.0.1.0", port=2124), limit=dict( burst=10, rate=dict(number=20, unit="second"), ), recent=dict(count=10, time=20), state=dict( established=True, related=True, invalid=True, new=True, ), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall name INBOUND rule 101 protocol 'tcp'", "set firewall name INBOUND rule 101 destination address 192.0.1.0", "set firewall name INBOUND rule 101 destination port 2124", "set firewall name INBOUND rule 101", "set firewall name INBOUND rule 101 source address 192.0.2.0", "set firewall name INBOUND rule 101 source mac-address 38:00:25:19:76:0c", "set firewall name INBOUND rule 101 source port 2127", "set firewall name INBOUND rule 101 state new enable", "set firewall name INBOUND rule 101 state invalid enable", "set firewall name INBOUND rule 101 state related enable", "set firewall name INBOUND rule 101 state established enable", "set firewall name INBOUND rule 101 limit burst 10", "set firewall name INBOUND rule 101 limit rate 20/second", "set firewall name INBOUND rule 101 recent count 10", "set firewall name INBOUND rule 101 recent time 20", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_rule_merged_03(self): """Test if plugin correctly adds new rules with variant attributes within existing rule set """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", destination=dict( group=dict( address_group="OUT-ADDR-GROUP", network_group="OUT-NET-GROUP", port_group="OUT-PORT-GROUP", ), ), source=dict( group=dict( address_group="IN-ADDR-GROUP", network_group="IN-NET-GROUP", port_group="IN-PORT-GROUP", ), ), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall name INBOUND rule 101 source group address-group IN-ADDR-GROUP", "set firewall name INBOUND rule 101 source group network-group IN-NET-GROUP", "set firewall name INBOUND rule 101 source group port-group IN-PORT-GROUP", "set firewall name INBOUND rule 101 destination group address-group OUT-ADDR-GROUP", "set firewall name INBOUND rule 101 destination group network-group OUT-NET-GROUP", "set firewall name INBOUND rule 101 destination group port-group OUT-PORT-GROUP", "set firewall name INBOUND rule 101", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_rule_merged_04(self): """Test if plugin correctly adds new rules with variant attributes within existing rule set """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", time=dict( monthdays="2", startdate="2020-01-24", starttime="13:20:00", stopdate="2020-01-28", stoptime="13:30:00", weekdays="!Sat,Sun", utc=True, ), tcp=dict( flags=[ dict(flag="all"), ] ), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall name INBOUND rule 101", "set firewall name INBOUND rule 101 tcp flags ALL", "set firewall name INBOUND rule 101 time utc", "set firewall name INBOUND rule 101 time monthdays 2", "set firewall name INBOUND rule 101 time startdate 2020-01-24", "set firewall name INBOUND rule 101 time stopdate 2020-01-28", "set firewall name INBOUND rule 101 time weekdays !Sat,Sun", "set firewall name INBOUND rule 101 time stoptime 13:30:00", "set firewall name INBOUND rule 101 time starttime 13:20:00", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v6_rule_sets_rule_merged_01(self): """Test if plugin correctly adds new ipv6 rules set and a rule with variant attributes""" set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", description="This is IPv6 INBOUND rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", protocol="icmp", disable=True, icmp=dict(type_name="echo-request"), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv6-name INBOUND default-action 'accept'", "set firewall ipv6-name INBOUND description 'This is IPv6 INBOUND rule set'", "set firewall ipv6-name INBOUND enable-default-log", "set firewall ipv6-name INBOUND rule 101 protocol 'icmp'", "set firewall ipv6-name INBOUND rule 101 description 'Rule 101 is configured by Ansible'", "set firewall ipv6-name INBOUND rule 101", "set firewall ipv6-name INBOUND rule 101 disable", "set firewall ipv6-name INBOUND rule 101 action 'accept'", "set firewall ipv6-name INBOUND rule 101 ipsec 'match-ipsec'", "set firewall ipv6-name INBOUND rule 101 icmpv6 type echo-request", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v6_rule_sets_rule_merged_02(self): """Test if plugin correctly adds new rules with variant attributes within existing ipv6 rule set """ set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", protocol="tcp", source=dict( address="2001:db8::12", mac_address="38:00:25:19:76:0c", port=2127, ), destination=dict(address="2001:db8::11", port=2124), limit=dict( burst=10, rate=dict(number=20, unit="second"), ), recent=dict(count=10, time=20), state=dict( established=True, related=True, invalid=True, new=True, ), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv6-name INBOUND rule 101 protocol 'tcp'", "set firewall ipv6-name INBOUND rule 101 destination address 2001:db8::11", "set firewall ipv6-name INBOUND rule 101 destination port 2124", "set firewall ipv6-name INBOUND rule 101", "set firewall ipv6-name INBOUND rule 101 source address 2001:db8::12", "set firewall ipv6-name INBOUND rule 101 source mac-address 38:00:25:19:76:0c", "set firewall ipv6-name INBOUND rule 101 source port 2127", "set firewall ipv6-name INBOUND rule 101 state new enable", "set firewall ipv6-name INBOUND rule 101 state invalid enable", "set firewall ipv6-name INBOUND rule 101 state related enable", "set firewall ipv6-name INBOUND rule 101 state established enable", "set firewall ipv6-name INBOUND rule 101 limit burst 10", "set firewall ipv6-name INBOUND rule 101 recent count 10", "set firewall ipv6-name INBOUND rule 101 recent time 20", "set firewall ipv6-name INBOUND rule 101 limit rate 20/second", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v6_rule_sets_rule_merged_03(self): """Test if plugin correctly adds new rules with variant attributes within existing ipv6 rule set """ set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", destination=dict( group=dict( address_group="OUT-ADDR-GROUP", network_group="OUT-NET-GROUP", port_group="OUT-PORT-GROUP", ), ), source=dict( group=dict( address_group="IN-ADDR-GROUP", network_group="IN-NET-GROUP", port_group="IN-PORT-GROUP", ), ), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv6-name INBOUND rule 101 source group address-group IN-ADDR-GROUP", "set firewall ipv6-name INBOUND rule 101 source group network-group IN-NET-GROUP", "set firewall ipv6-name INBOUND rule 101 source group port-group IN-PORT-GROUP", "set firewall ipv6-name INBOUND rule 101 destination group address-group OUT-ADDR-GROUP", "set firewall ipv6-name INBOUND rule 101 destination group network-group OUT-NET-GROUP", "set firewall ipv6-name INBOUND rule 101 destination group port-group OUT-PORT-GROUP", "set firewall ipv6-name INBOUND rule 101", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v6_rule_sets_rule_merged_04(self): """Test if plugin correctly adds new rules with variant attributes within existing ipv6 rule set """ set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", time=dict( monthdays="2", startdate="2020-01-24", starttime="13:20:00", stopdate="2020-01-28", stoptime="13:30:00", weekdays="!Sat,Sun", utc=True, ), tcp=dict( flags=[ dict(flag="all"), ] ), ), dict( number="102", tcp=dict( flags=[ dict(flag="ack"), dict(flag="syn"), dict(flag="fin", invert=True), ], ) ) ], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv6-name INBOUND rule 101", "set firewall ipv6-name INBOUND rule 101 tcp flags ALL", "set firewall ipv6-name INBOUND rule 101 time utc", "set firewall ipv6-name INBOUND rule 101 time monthdays 2", "set firewall ipv6-name INBOUND rule 101 time startdate 2020-01-24", "set firewall ipv6-name INBOUND rule 101 time stopdate 2020-01-28", "set firewall ipv6-name INBOUND rule 101 time weekdays !Sat,Sun", "set firewall ipv6-name INBOUND rule 101 time stoptime 13:30:00", "set firewall ipv6-name INBOUND rule 101 time starttime 13:20:00", "set firewall ipv6-name INBOUND rule 102", "set firewall ipv6-name INBOUND rule 102 tcp flags ACK,SYN,!FIN", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v6_rule_sets_rule_merged_icmp_01(self): """Test if plugin correctly adds new rules with variant attributes within existing ipv6 rule set """ set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", protocol="icmp", icmp=dict(type_name="port-unreachable"), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv6-name INBOUND rule 101 icmpv6 type port-unreachable", "set firewall ipv6-name INBOUND rule 101 protocol 'icmp'", "set firewall ipv6-name INBOUND rule 101", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_rule_merged_icmp_01(self): """Test if plugin correctly adds new rules with variant attributes within existing rule set """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", protocol="icmp", icmp=dict(type=1, code=1), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall name INBOUND rule 101 icmp type 1", "set firewall name INBOUND rule 101 icmp code 1", "set firewall name INBOUND rule 101 protocol 'icmp'", "set firewall name INBOUND rule 101", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_rule_merged_icmp_02(self): """Test if plugin correctly adds new rules with variant attributes within existing rule set """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", protocol="icmp", icmp=dict(type_name="echo-request"), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall name INBOUND rule 101 icmp type-name echo-request", "set firewall name INBOUND rule 101 protocol 'icmp'", "set firewall name INBOUND rule 101", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_del_01(self): """Test if plugin correctly removes existing rule set """ set_module_args( dict( config=[dict(afi="ipv4", rule_sets=[dict(name="V4-INGRESS")])], state="deleted", ), ) commands = ["delete firewall name V4-INGRESS"] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_del_02(self): """Test if plugin correctly removes existing rule sets, both ipv4 and ipv6 """ set_module_args( dict( config=[ dict(afi="ipv4", rule_sets=[dict(name="V4-INGRESS")]), dict(afi="ipv6", rule_sets=[dict(name="V6-INGRESS")]), ], state="deleted", ), ) commands = [ "delete firewall name V4-INGRESS", "delete firewall ipv6-name V6-INGRESS", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_del_03(self): """Test if plugin correctly removes existing AFIs, both ipv4 and ipv6 """ set_module_args(dict(config=[], state="deleted")) commands = ["delete firewall name", "delete firewall ipv6-name"] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_del_04(self): """Test if plugin has no effect on non-existent rule sets """ set_module_args( dict( config=[ dict(afi="ipv4", rule_sets=[dict(name="V4-ING")]), dict(afi="ipv6", rule_sets=[dict(name="V6-ING")]), ], state="deleted", ), ) self.execute_module(changed=False, commands=[]) def test_vyos_firewall_v4v6_rule_sets_rule_rep_01(self): """Test if plugin correctly replaces a particular rule set(s) without affecting the others """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="reject", description="Rule 101 is configured by Ansible RM", ipsec="match-ipsec", protocol="tcp", fragment="match-frag", disable=False, ), dict( number="102", action="accept", description="Rule 102 is configured by Ansible RM", protocol="icmp", disable=True, ), ], ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="V6-INGRESS", default_action="accept", description="This rule-set is configured by Ansible RM", ), dict( name="EGRESS", default_action="reject", description="This rule-set is configured by Ansible RM", rules=[ dict( icmp=dict(type_name="echo-request"), number=20, ), ], ), ], ), ], state="replaced", ), ) commands = [ "delete firewall name V4-INGRESS rule 101", "set firewall name V4-INGRESS rule 101", "set firewall name V4-INGRESS description 'This is IPv4 INGRESS rule set'", "set firewall name V4-INGRESS rule 101 fragment 'match-frag'", "set firewall name V4-INGRESS rule 101 ipsec 'match-ipsec'", "set firewall name V4-INGRESS rule 101 protocol 'tcp'", "set firewall name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible RM'", "set firewall name V4-INGRESS rule 101 action 'reject'", "set firewall name V4-INGRESS rule 102 disable", "set firewall name V4-INGRESS rule 102 action 'accept'", "set firewall name V4-INGRESS rule 102 protocol 'icmp'", "set firewall name V4-INGRESS rule 102 description 'Rule 102 is configured by Ansible RM'", "set firewall name V4-INGRESS rule 102", "set firewall ipv6-name V6-INGRESS description 'This rule-set is configured by Ansible RM'", "set firewall ipv6-name EGRESS description 'This rule-set is configured by Ansible RM'", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_rule_rep_02(self): """Test if plugin correctly replaces a particular rule(s) and rule set attribute(s) without affecting the others """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 V4-INGRESS rule set", default_action="accept", enable_default_log=False, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", protocol="icmp", fragment="match-frag", disable=True, ), ], ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="V6-INGRESS", default_action="accept", ), dict( name="EGRESS", default_action="reject", rules=[ dict( icmp=dict(type_name="echo-request"), number=20, ), ], ), ], ), ], state="replaced", ), ) commands = [ "delete firewall name V4-INGRESS rule 101", "delete firewall name V4-INGRESS enable-default-log", "set firewall name V4-INGRESS rule 101", "set firewall name V4-INGRESS rule 101 action 'accept'", "set firewall name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible'", "set firewall name V4-INGRESS rule 101 disable", "set firewall name V4-INGRESS rule 101 fragment 'match-frag'", "set firewall name V4-INGRESS rule 101 ipsec 'match-ipsec'", "set firewall name V4-INGRESS rule 101 protocol 'icmp'", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_rule_rep_idem_01(self): """Test if plugin correctly has no effect if there is no change in the configuration """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 V4-INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", protocol="icmp", fragment="match-frag", disable=True, log="enable", ) ], ), dict( name="EGRESS", default_action="reject", ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="V6-INGRESS", default_action="accept", ), dict( name="EGRESS", default_action="reject", rules=[ dict( icmp=dict(type_name="echo-request"), number=20, ), ], ), ], ), ], state="replaced", ), ) self.execute_module(changed=False, commands=[]) def test_vyos_firewall_v4v6_rule_sets_rule_rep_idem_02(self): """Test if plugin correctly has no effect if there is no change in the configuration """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 V4-INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", protocol="icmp", fragment="match-frag", disable=True, log="enable" ), ], ), ], ), ], state="replaced", ), ) self.execute_module(changed=False, commands=[]) def test_vyos_firewall_v4v6_rule_sets_rule_mer_idem_01(self): """Test if plugin correctly has no effect if there is no change in the configuration """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 V4-INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", protocol="icmp", fragment="match-frag", disable=True, ) ], ), dict( name="EGRESS", default_action="reject", ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="V6-INGRESS", default_action="accept", ), dict( name="EGRESS", default_action="reject", rules=[ dict( icmp=dict(type_name="echo-request"), number=20, ), ], ), ], ), ], state="merged", ), ) self.execute_module(changed=False, commands=[]) def test_vyos_firewall_v4v6_rule_sets_rule_ovr_01(self): """Test if plugin correctly resets the entire rule set if there is a change in the configuration """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-IN", description="This is IPv4 INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="1", action="reject", description="Rule 1 is configured by Ansible RM", ipsec="match-ipsec", log="enable", protocol="tcp", fragment="match-frag", disable=False, source=dict( group=dict( address_group="IN-ADDR-GROUP", network_group="IN-NET-GROUP", port_group="IN-PORT-GROUP", ), ), ), dict( number="2", action="accept", description="Rule 102 is configured by Ansible RM", protocol="icmp", disable=True, ), ], ), + dict( + name="MULTIPLE-RULE", + default_action="drop", + rules=[ + dict( + number="1", + action="accept", + protocol="all", + ), + ], + ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="V6-IN", default_action="accept", description="This rule-set is configured by Ansible RM", ), dict( name="V6-EG", default_action="reject", description="This rule-set is configured by Ansible RM", ), ], ), ], state="overridden", ), ) commands = [ "delete firewall ipv6-name V6-INGRESS", "delete firewall ipv6-name EGRESS", "delete firewall name V4-INGRESS", "delete firewall name EGRESS", + "delete firewall name MULTIPLE-RULE", + "set firewall name MULTIPLE-RULE default-action 'drop'", + "set firewall name MULTIPLE-RULE rule 1", + "set firewall name MULTIPLE-RULE rule 1 action 'accept'", + "set firewall name MULTIPLE-RULE rule 1 protocol 'all'", "set firewall name V4-IN default-action 'accept'", "set firewall name V4-IN description 'This is IPv4 INGRESS rule set'", "set firewall name V4-IN enable-default-log", "set firewall name V4-IN rule 1 protocol 'tcp'", "set firewall name V4-IN rule 1 log 'enable'", "set firewall name V4-IN rule 1 description 'Rule 1 is configured by Ansible RM'", "set firewall name V4-IN rule 1 fragment 'match-frag'", "set firewall name V4-IN rule 1 source group address-group IN-ADDR-GROUP", "set firewall name V4-IN rule 1 source group network-group IN-NET-GROUP", "set firewall name V4-IN rule 1 source group port-group IN-PORT-GROUP", "set firewall name V4-IN rule 1", "set firewall name V4-IN rule 1 action 'reject'", "set firewall name V4-IN rule 1 ipsec 'match-ipsec'", "set firewall name V4-IN rule 2 disable", "set firewall name V4-IN rule 2 action 'accept'", "set firewall name V4-IN rule 2 protocol 'icmp'", "set firewall name V4-IN rule 2 description 'Rule 102 is configured by Ansible RM'", "set firewall name V4-IN rule 2", "set firewall ipv6-name V6-IN default-action 'accept'", "set firewall ipv6-name V6-IN description 'This rule-set is configured by Ansible RM'", "set firewall ipv6-name V6-EG default-action 'reject'", "set firewall ipv6-name V6-EG description 'This rule-set is configured by Ansible RM'", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_rule_ovr_02(self): """Test if plugin correctly resets the entire rule set while removing the absent ones if there is a change in the configuration """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", protocol="udp", ), ], ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="EGRESS", default_action="reject", description="This rule-set is configured by Ansible RM", rules=[ dict( number="20", action="accept", protocol="udp", ), ], ), ], ), ], state="overridden", ), ) commands = [ "delete firewall ipv6-name V6-INGRESS", "delete firewall ipv6-name EGRESS", "delete firewall name V4-INGRESS", "delete firewall name EGRESS", + "delete firewall name MULTIPLE-RULE", "set firewall name V4-INGRESS rule 101", "set firewall name V4-INGRESS description 'This is IPv4 INGRESS rule set'", "set firewall name V4-INGRESS default-action 'accept'", "set firewall name V4-INGRESS enable-default-log", "set firewall name V4-INGRESS rule 101 protocol 'udp'", "set firewall name V4-INGRESS rule 101 action 'accept'", "set firewall ipv6-name EGRESS description 'This rule-set is configured by Ansible RM'", "set firewall ipv6-name EGRESS default-action 'reject'", "set firewall ipv6-name EGRESS rule 20", "set firewall ipv6-name EGRESS rule 20 protocol 'udp'", "set firewall ipv6-name EGRESS rule 20 action 'accept'" ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_rule_ovr_idem_01(self): """Test if plugin correctly has no effect if there is no change in the configuration """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 V4-INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", protocol="icmp", fragment="match-frag", disable=True, log="enable", ) ], ), dict( name="EGRESS", default_action="reject", ), + dict( + name="MULTIPLE-RULE", + default_action="drop", + rules=[ + dict( + number="1", + action="accept", + protocol="all", + ), + dict( + number="2", + action="drop", + protocol="all", + ), + ], + ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="V6-INGRESS", default_action="accept", ), dict( name="EGRESS", default_action="reject", rules=[ dict( icmp=dict(type_name="echo-request"), number=20, ), ], ), ], ), ], state="overridden", ), ) self.execute_module(changed=False, commands=[]) def test_vyos_firewall_v6_rule_sets_rule_merged_01_version(self): """Test if plugin correctly adds ipv6 rule set with rules """ self.get_os_version.return_value = "1.3" set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", description="This is IPv6 INBOUND rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", protocol="icmp", disable=True, icmp=dict(type_name="echo-request"), log="enable", ), dict( number="102", action="reject", description="Rule 102 is configured by Ansible", protocol="ipv6-icmp", icmp=dict(type=7), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv6-name INBOUND default-action 'accept'", "set firewall ipv6-name INBOUND description 'This is IPv6 INBOUND rule set'", "set firewall ipv6-name INBOUND enable-default-log", "set firewall ipv6-name INBOUND rule 101 protocol 'icmp'", "set firewall ipv6-name INBOUND rule 101 description 'Rule 101 is configured by Ansible'", "set firewall ipv6-name INBOUND rule 101", "set firewall ipv6-name INBOUND rule 101 disable", "set firewall ipv6-name INBOUND rule 101 action 'accept'", "set firewall ipv6-name INBOUND rule 101 ipsec 'match-ipsec'", "set firewall ipv6-name INBOUND rule 101 icmpv6 type echo-request", "set firewall ipv6-name INBOUND rule 101 log 'enable'", "set firewall ipv6-name INBOUND rule 102", "set firewall ipv6-name INBOUND rule 102 action 'reject'", "set firewall ipv6-name INBOUND rule 102 description 'Rule 102 is configured by Ansible'", "set firewall ipv6-name INBOUND rule 102 protocol 'ipv6-icmp'", 'set firewall ipv6-name INBOUND rule 102 icmpv6 type 7', ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_jump_rules_merged_01(self): """Test if plugin correctly adds rule set with a jump action """ set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", description="This is IPv6 INBOUND rule set with a jump action", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="jump", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", protocol="icmp", icmp=dict(type_name="echo-request"), jump_target="PROTECT-RE", packet_length_exclude=[dict(length=100), dict(length=200)] ), dict( number="102", action="reject", description="Rule 102 is configured by Ansible", protocol="ipv6-icmp", icmp=dict(type=7), ), ], ), ], ) ], state="merged", ) ) commands = [ "set firewall ipv6-name INBOUND default-action 'accept'", "set firewall ipv6-name INBOUND description 'This is IPv6 INBOUND rule set with a jump action'", "set firewall ipv6-name INBOUND enable-default-log", "set firewall ipv6-name INBOUND rule 101 protocol 'icmp'", "set firewall ipv6-name INBOUND rule 101 packet-length-exclude 100", "set firewall ipv6-name INBOUND rule 101 packet-length-exclude 200", "set firewall ipv6-name INBOUND rule 101 description 'Rule 101 is configured by Ansible'", "set firewall ipv6-name INBOUND rule 101", "set firewall ipv6-name INBOUND rule 101 ipsec 'match-ipsec'", "set firewall ipv6-name INBOUND rule 101 icmpv6 type echo-request", "set firewall ipv6-name INBOUND rule 101 action 'jump'", "set firewall ipv6-name INBOUND rule 101 jump-target 'PROTECT-RE'", "set firewall ipv6-name INBOUND rule 102", "set firewall ipv6-name INBOUND rule 102 action 'reject'", "set firewall ipv6-name INBOUND rule 102 description 'Rule 102 is configured by Ansible'", "set firewall ipv6-name INBOUND rule 102 protocol 'ipv6-icmp'", 'set firewall ipv6-name INBOUND rule 102 icmpv6 type 7', ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_log_merged_01(self): """Test if new stanza log is correctly applied""" set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", description="This is IPv6 INBOUND rule set with a log", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", log="enable", ), ], ), ], ) ], state="merged", ) ) commands = [ "set firewall ipv6-name INBOUND default-action 'accept'", "set firewall ipv6-name INBOUND description 'This is IPv6 INBOUND rule set with a log'", "set firewall ipv6-name INBOUND enable-default-log", "set firewall ipv6-name INBOUND rule 101 log 'enable'", "set firewall ipv6-name INBOUND rule 101 description 'Rule 101 is configured by Ansible'", "set firewall ipv6-name INBOUND rule 101", "set firewall ipv6-name INBOUND rule 101 action 'accept'", ] self.maxDiff = None self.execute_module(changed=True, commands=commands) def test_vyos_firewall_log_replace_01(self): """Test that stanza is correctly replaced without touching the other stanzas """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 V4-INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", packet_length_exclude=[dict(length=100), dict(length=200)], packet_length=[dict(length=22)], log="enable", ), ], ), ], ) ], state="replaced", ) ) commands = [ "delete firewall name V4-INGRESS rule 101", "set firewall name V4-INGRESS rule 101", "set firewall name V4-INGRESS rule 101 action 'accept'", "set firewall name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible'", "set firewall name V4-INGRESS rule 101 packet-length-exclude 100", "set firewall name V4-INGRESS rule 101 packet-length-exclude 200", "set firewall name V4-INGRESS rule 101 packet-length 22", "set firewall name V4-INGRESS rule 101 log 'enable'", ] self.maxDiff = None self.execute_module(changed=True, commands=commands) diff --git a/tests/unit/modules/network/vyos/test_vyos_firewall_rules14.py b/tests/unit/modules/network/vyos/test_vyos_firewall_rules14.py index 547b8f45..64884282 100644 --- a/tests/unit/modules/network/vyos/test_vyos_firewall_rules14.py +++ b/tests/unit/modules/network/vyos/test_vyos_firewall_rules14.py @@ -1,1863 +1,1896 @@ # (c) 2016 Red Hat Inc. # # This file is part of Ansible # # Ansible is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Ansible is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Ansible. If not, see . # Make coding more python3-ish from __future__ import absolute_import, division, print_function __metaclass__ = type from unittest.mock import patch from ansible_collections.vyos.vyos.plugins.modules import vyos_firewall_rules from ansible_collections.vyos.vyos.tests.unit.modules.utils import set_module_args from .vyos_module import TestVyosModule, load_fixture class TestVyosFirewallRulesModule14(TestVyosModule): module = vyos_firewall_rules def setUp(self): super(TestVyosFirewallRulesModule14, self).setUp() self.mock_get_config = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.get_config" ) self.get_config = self.mock_get_config.start() self.mock_load_config = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.load_config" ) self.load_config = self.mock_load_config.start() self.mock_get_resource_connection_config = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base.get_resource_connection" ) self.get_resource_connection_config = self.mock_get_resource_connection_config.start() self.mock_get_resource_connection_facts = patch( "ansible_collections.ansible.netcommon.plugins.module_utils.network.common.facts.facts.get_resource_connection" ) self.get_resource_connection_facts = self.mock_get_resource_connection_facts.start() self.mock_execute_show_command = patch( "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.static_routes.static_routes.Static_routesFacts.get_device_data" ) self.mock_execute_show_command = patch( "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.firewall_rules.firewall_rules.Firewall_rulesFacts.get_device_data" ) self.execute_show_command = self.mock_execute_show_command.start() self.mock_get_os_version = patch( "ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.config.firewall_rules.firewall_rules.get_os_version" ) self.get_os_version = self.mock_get_os_version.start() self.get_os_version.return_value = "1.4" self.maxDiff = None def tearDown(self): super(TestVyosFirewallRulesModule14, self).tearDown() self.mock_get_resource_connection_config.stop() self.mock_get_resource_connection_facts.stop() self.mock_get_config.stop() self.mock_load_config.stop() self.mock_execute_show_command.stop() self.mock_get_os_version.stop() def load_fixtures(self, commands=None, filename=None): def load_from_file(*args, **kwargs): return load_fixture("vyos_firewall_rules_config_v14.cfg") self.execute_show_command.side_effect = load_from_file def test_vyos_firewall_rule_set_01_merged(self): set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="V6-INBOUND", description="This is IPv6 INBOUND rule set", default_action="reject", enable_default_log=True, rules=[], ), dict( name="V6-OUTBOUND", description="This is IPv6 OUTBOUND rule set", default_action="accept", enable_default_log=False, rules=[], ), ], ), dict( afi="ipv4", rule_sets=[ dict( name="V4-INBOUND", description="This is IPv4 INBOUND rule set", default_action="reject", enable_default_log=True, rules=[], ), dict( name="V4-OUTBOUND", description="This is IPv4 OUTBOUND rule set", default_action="accept", enable_default_log=False, rules=[], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv6 name V6-INBOUND default-action 'reject'", "set firewall ipv6 name V6-INBOUND description 'This is IPv6 INBOUND rule set'", "set firewall ipv6 name V6-INBOUND default-log", "set firewall ipv6 name V6-OUTBOUND default-action 'accept'", "set firewall ipv6 name V6-OUTBOUND description 'This is IPv6 OUTBOUND rule set'", "set firewall ipv4 name V4-INBOUND default-action 'reject'", "set firewall ipv4 name V4-INBOUND description 'This is IPv4 INBOUND rule set'", "set firewall ipv4 name V4-INBOUND default-log", "set firewall ipv4 name V4-OUTBOUND default-action 'accept'", "set firewall ipv4 name V4-OUTBOUND description 'This is IPv4 OUTBOUND rule set'", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_packet_length_merged_01(self): """Test if new stanza packet-lenght is correctly applied""" set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", description="This is IPv6 INBOUND rule set with a jump action", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="jump", description="Rule 101 is configured by Ansible", jump_target="PROTECT-RE", packet_length_exclude=[dict(length=100), dict(length=200)], packet_length=[dict(length=22)] ), ], ), ], ) ], state="merged", ) ) commands = [ "set firewall ipv6 name INBOUND default-action 'accept'", "set firewall ipv6 name INBOUND description 'This is IPv6 INBOUND rule set with a jump action'", "set firewall ipv6 name INBOUND default-log", "set firewall ipv6 name INBOUND rule 101 packet-length-exclude 100", "set firewall ipv6 name INBOUND rule 101 packet-length-exclude 200", "set firewall ipv6 name INBOUND rule 101 packet-length 22", "set firewall ipv6 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'", "set firewall ipv6 name INBOUND rule 101", "set firewall ipv6 name INBOUND rule 101 action 'jump'", "set firewall ipv6 name INBOUND rule 101 jump-target 'PROTECT-RE'", ] self.maxDiff = None self.execute_module(changed=True, commands=commands) def test_vyos_firewall_packet_length_replace_01(self): """Test that stanza is correctly replaced without touching the other stanzas """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 V4-INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", packet_length_exclude=[dict(length=100), dict(length=200)], packet_length=[dict(length=22)] ), ], ), ], ) ], state="replaced", ) ) commands = [ "delete firewall ipv4 name V4-INGRESS rule 101", "set firewall ipv4 name V4-INGRESS rule 101", "set firewall ipv4 name V4-INGRESS rule 101 action 'accept'", "set firewall ipv4 name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible'", "set firewall ipv4 name V4-INGRESS rule 101 packet-length-exclude 100", "set firewall ipv4 name V4-INGRESS rule 101 packet-length-exclude 200", "set firewall ipv4 name V4-INGRESS rule 101 packet-length 22", ] self.maxDiff = None self.execute_module(changed=True, commands=commands) def test_vyos_firewall_filter_merged_01(self): """Test if new stanza filter is correctly applied""" set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( filter="input", description="This is IPv6 INBOUND rule set with a jump action", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="jump", description="Rule 101 is configured by Ansible", jump_target="PROTECT-RE", packet_length_exclude=[dict(length=100), dict(length=200)], packet_length=[dict(length=22)] ), ], ), ], ) ], state="merged", ) ) commands = [ "set firewall ipv6 input filter default-action 'accept'", "set firewall ipv6 input filter description 'This is IPv6 INBOUND rule set with a jump action'", "set firewall ipv6 input filter default-log", "set firewall ipv6 input filter rule 101 packet-length-exclude 100", "set firewall ipv6 input filter rule 101 packet-length-exclude 200", "set firewall ipv6 input filter rule 101 packet-length 22", "set firewall ipv6 input filter rule 101 description 'Rule 101 is configured by Ansible'", "set firewall ipv6 input filter rule 101", "set firewall ipv6 input filter rule 101 action 'jump'", "set firewall ipv6 input filter rule 101 jump-target 'PROTECT-RE'", ] self.maxDiff = None self.execute_module(changed=True, commands=commands) def test_vyos_firewall_interface_merged_01(self): """Test that the rule with a jump action is correctly applied""" set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="V6-INGRESS", description="This is IPv6 INBOUND rule set with a jump action", default_action="accept", rules=[ dict( number="101", action="jump", description="Rule 101 is configured by Ansible", jump_target="PROTECT-RE", inbound_interface=dict(name="eth0"), outbound_interface=dict(group="eth1"), ), ], ), ], ) ], state="merged", ) ) commands = [ "set firewall ipv6 name V6-INGRESS description 'This is IPv6 INBOUND rule set with a jump action'", "set firewall ipv6 name V6-INGRESS rule 101 inbound-interface name eth0", "set firewall ipv6 name V6-INGRESS rule 101 outbound-interface group eth1", "set firewall ipv6 name V6-INGRESS rule 101 description 'Rule 101 is configured by Ansible'", "set firewall ipv6 name V6-INGRESS rule 101", "set firewall ipv6 name V6-INGRESS rule 101 action 'jump'", "set firewall ipv6 name V6-INGRESS rule 101 jump-target 'PROTECT-RE'", ] self.maxDiff = None self.execute_module(changed=True, commands=commands) def test_vyos_firewall_interface_replace_02(self): """Test that new stanza is correctly replaced without touching the other stanzas """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="IF-TEST", description="Changed", rules=[ dict( number="10", action="accept", description="Rule 10 is configured by Ansible", inbound_interface=dict(name="eth1"), ), ], ), ], ) ], state="replaced", ) ) commands = [ "delete firewall ipv4 name IF-TEST rule 10", "set firewall ipv4 name IF-TEST rule 10", "set firewall ipv4 name IF-TEST description 'Changed'", "set firewall ipv4 name IF-TEST rule 10 description 'Rule 10 is configured by Ansible'", 'set firewall ipv4 name IF-TEST rule 10 inbound-interface name eth1', "set firewall ipv4 name IF-TEST rule 10 action 'accept'", ] self.maxDiff = None self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_rule_merged_01(self): """Test if plugin correctly adds new rules set and a rule with variant attributes""" set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="INBOUND", description="This is IPv4 INBOUND rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", log="disable", protocol="icmp", fragment="match-frag", disable=True, ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv4 name INBOUND default-action 'accept'", "set firewall ipv4 name INBOUND description 'This is IPv4 INBOUND rule set'", "set firewall ipv4 name INBOUND default-log", "set firewall ipv4 name INBOUND rule 101", "set firewall ipv4 name INBOUND rule 101 protocol 'icmp'", "set firewall ipv4 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'", "set firewall ipv4 name INBOUND rule 101 fragment 'match-frag'", "set firewall ipv4 name INBOUND rule 101 disable", "set firewall ipv4 name INBOUND rule 101 action 'accept'", "set firewall ipv4 name INBOUND rule 101 ipsec 'match-ipsec'", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_rule_merged_02(self): """Test that a rule set is correctly applied including variant attributes such as state """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", protocol="tcp", source=dict( address="192.0.2.0", mac_address="38:00:25:19:76:0c", port=2127, ), destination=dict(address="192.0.1.0", port=2124), limit=dict( burst=10, rate=dict(number=20, unit="second"), ), recent=dict(count=10, time=20), state=dict( established=True, related=True, invalid=True, new=True, ), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv4 name INBOUND rule 101 protocol 'tcp'", "set firewall ipv4 name INBOUND rule 101 destination port 2124", "set firewall ipv4 name INBOUND rule 101", "set firewall ipv4 name INBOUND rule 101 destination address 192.0.1.0", "set firewall ipv4 name INBOUND rule 101 source address 192.0.2.0", "set firewall ipv4 name INBOUND rule 101 source mac-address 38:00:25:19:76:0c", "set firewall ipv4 name INBOUND rule 101 source port 2127", "set firewall ipv4 name INBOUND rule 101 state new", "set firewall ipv4 name INBOUND rule 101 state invalid", "set firewall ipv4 name INBOUND rule 101 state related", "set firewall ipv4 name INBOUND rule 101 state established", "set firewall ipv4 name INBOUND rule 101 limit burst 10", "set firewall ipv4 name INBOUND rule 101 limit rate 20/second", "set firewall ipv4 name INBOUND rule 101 recent count 10", "set firewall ipv4 name INBOUND rule 101 recent time 20", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_rule_merged_03(self): """Test if plugin correctly adds new rules with variant attributes within existing rule set """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", destination=dict( group=dict( address_group="OUT-ADDR-GROUP", network_group="OUT-NET-GROUP", port_group="OUT-PORT-GROUP", ), ), source=dict( group=dict( address_group="IN-ADDR-GROUP", network_group="IN-NET-GROUP", port_group="IN-PORT-GROUP", ), ), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv4 name INBOUND rule 101 source group address-group IN-ADDR-GROUP", "set firewall ipv4 name INBOUND rule 101 source group network-group IN-NET-GROUP", "set firewall ipv4 name INBOUND rule 101 source group port-group IN-PORT-GROUP", "set firewall ipv4 name INBOUND rule 101 destination group address-group OUT-ADDR-GROUP", "set firewall ipv4 name INBOUND rule 101 destination group network-group OUT-NET-GROUP", "set firewall ipv4 name INBOUND rule 101 destination group port-group OUT-PORT-GROUP", "set firewall ipv4 name INBOUND rule 101", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_rule_merged_04(self): """Test if plugin correctly adds new rules with variant attributes within existing rule set """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", time=dict( monthdays="2", startdate="2020-01-24", starttime="13:20:00", stopdate="2020-01-28", stoptime="13:30:00", weekdays="!Sat,Sun", utc=True, ), tcp=dict( flags=[ dict(flag="all"), ] ), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv4 name INBOUND rule 101", "set firewall ipv4 name INBOUND rule 101 tcp flags all", "set firewall ipv4 name INBOUND rule 101 time utc", "set firewall ipv4 name INBOUND rule 101 time monthdays 2", "set firewall ipv4 name INBOUND rule 101 time startdate 2020-01-24", "set firewall ipv4 name INBOUND rule 101 time stopdate 2020-01-28", "set firewall ipv4 name INBOUND rule 101 time weekdays !Sat,Sun", "set firewall ipv4 name INBOUND rule 101 time stoptime 13:30:00", "set firewall ipv4 name INBOUND rule 101 time starttime 13:20:00", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v6_rule_sets_rule_merged_01(self): """Test if plugin correctly adds new ipv6 rules set and a rule with variant attributes""" set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", description="This is IPv6 INBOUND rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", protocol="icmp", disable=True, icmp=dict(type_name="echo-request"), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv6 name INBOUND default-action 'accept'", "set firewall ipv6 name INBOUND description 'This is IPv6 INBOUND rule set'", "set firewall ipv6 name INBOUND default-log", "set firewall ipv6 name INBOUND rule 101 protocol 'icmp'", "set firewall ipv6 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'", "set firewall ipv6 name INBOUND rule 101", "set firewall ipv6 name INBOUND rule 101 disable", "set firewall ipv6 name INBOUND rule 101 action 'accept'", "set firewall ipv6 name INBOUND rule 101 ipsec 'match-ipsec'", "set firewall ipv6 name INBOUND rule 101 icmpv6 type-name echo-request", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v6_rule_sets_rule_merged_02(self): """Test if plugin correctly adds new rules with variant attributes within existing ipv6 rule set """ set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", protocol="tcp", source=dict( address="2001:db8::12", mac_address="38:00:25:19:76:0c", port=2127, ), destination=dict(address="2001:db8::11", port=2124), limit=dict( burst=10, rate=dict(number=20, unit="second"), ), recent=dict(count=10, time=20), state=dict( established=True, related=True, invalid=True, new=True, ), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv6 name INBOUND rule 101 protocol 'tcp'", "set firewall ipv6 name INBOUND rule 101 destination address 2001:db8::11", "set firewall ipv6 name INBOUND rule 101 destination port 2124", "set firewall ipv6 name INBOUND rule 101", "set firewall ipv6 name INBOUND rule 101 source address 2001:db8::12", "set firewall ipv6 name INBOUND rule 101 source mac-address 38:00:25:19:76:0c", "set firewall ipv6 name INBOUND rule 101 source port 2127", "set firewall ipv6 name INBOUND rule 101 state new", "set firewall ipv6 name INBOUND rule 101 state invalid", "set firewall ipv6 name INBOUND rule 101 state related", "set firewall ipv6 name INBOUND rule 101 state established", "set firewall ipv6 name INBOUND rule 101 limit burst 10", "set firewall ipv6 name INBOUND rule 101 recent count 10", "set firewall ipv6 name INBOUND rule 101 recent time 20", "set firewall ipv6 name INBOUND rule 101 limit rate 20/second", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v6_rule_sets_rule_merged_03(self): """Test if plugin correctly adds new rules with variant attributes within existing ipv6 rule set """ set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", destination=dict( group=dict( address_group="OUT-ADDR-GROUP", network_group="OUT-NET-GROUP", port_group="OUT-PORT-GROUP", ), ), source=dict( group=dict( address_group="IN-ADDR-GROUP", network_group="IN-NET-GROUP", port_group="IN-PORT-GROUP", ), ), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv6 name INBOUND rule 101 source group address-group IN-ADDR-GROUP", "set firewall ipv6 name INBOUND rule 101 source group network-group IN-NET-GROUP", "set firewall ipv6 name INBOUND rule 101 source group port-group IN-PORT-GROUP", "set firewall ipv6 name INBOUND rule 101 destination group address-group OUT-ADDR-GROUP", "set firewall ipv6 name INBOUND rule 101 destination group network-group OUT-NET-GROUP", "set firewall ipv6 name INBOUND rule 101 destination group port-group OUT-PORT-GROUP", "set firewall ipv6 name INBOUND rule 101", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v6_rule_sets_rule_merged_04(self): """Test that the plugin correctly applies configuration within exsiting rule set """ set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", time=dict( monthdays="2", startdate="2020-01-24", starttime="13:20:00", stopdate="2020-01-28", stoptime="13:30:00", weekdays="!Sat,Sun", utc=True, ), tcp=dict( flags=[ dict(flag="all"), ] ), ), dict( number="102", tcp=dict( flags=[ dict(flag="ack"), dict(flag="syn"), dict(flag="fin", invert=True), ], ) ) ], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv6 name INBOUND rule 101", "set firewall ipv6 name INBOUND rule 101 tcp flags all", "set firewall ipv6 name INBOUND rule 101 time utc", "set firewall ipv6 name INBOUND rule 101 time monthdays 2", "set firewall ipv6 name INBOUND rule 101 time startdate 2020-01-24", "set firewall ipv6 name INBOUND rule 101 time stopdate 2020-01-28", "set firewall ipv6 name INBOUND rule 101 time weekdays !Sat,Sun", "set firewall ipv6 name INBOUND rule 101 time stoptime 13:30:00", "set firewall ipv6 name INBOUND rule 101 time starttime 13:20:00", "set firewall ipv6 name INBOUND rule 102", "set firewall ipv6 name INBOUND rule 102 tcp flags ack", "set firewall ipv6 name INBOUND rule 102 tcp flags not fin", "set firewall ipv6 name INBOUND rule 102 tcp flags syn", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_change_state_01(self): """Test that a rule set is replaced applied without touching the other stanzas in particular variant attributes such as state """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="IF-TEST", rules=[ dict( number="10", disable=False, action="accept", state=dict( established=True, new=True, ), ), ], ), ], ), ], state="replaced", ), ) commands = [ "delete firewall ipv4 name IF-TEST rule 10", "set firewall ipv4 name IF-TEST rule 10", "set firewall ipv4 name IF-TEST rule 10 state established", "set firewall ipv4 name IF-TEST rule 10 state new", "set firewall ipv4 name IF-TEST rule 10 action 'accept'", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v6_rule_sets_rule_merged_icmp_01(self): """Test if plugin correctly adds new rules with variant attributes within existing ipv6 rule set """ set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", protocol="icmp", icmp=dict(type_name="port-unreachable"), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv6 name INBOUND rule 101 icmpv6 type-name port-unreachable", "set firewall ipv6 name INBOUND rule 101 protocol 'icmp'", "set firewall ipv6 name INBOUND rule 101", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_rule_merged_icmp_01(self): """Test if plugin correctly adds new rules with variant attributes within existing rule set """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", protocol="icmp", icmp=dict(type=1, code=1), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv4 name INBOUND rule 101 icmp type 1", "set firewall ipv4 name INBOUND rule 101 icmp code 1", "set firewall ipv4 name INBOUND rule 101 protocol 'icmp'", "set firewall ipv4 name INBOUND rule 101", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_rule_merged_icmp_02(self): """Test if plugin correctly adds new rules with variant attributes within existing rule set """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="INBOUND", rules=[ dict( number="101", protocol="icmp", icmp=dict(type_name="echo-request"), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv4 name INBOUND rule 101 icmp type-name echo-request", "set firewall ipv4 name INBOUND rule 101 protocol 'icmp'", "set firewall ipv4 name INBOUND rule 101", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4_rule_sets_del_01(self): """Test if plugin correctly removes existing rule set """ set_module_args( dict( config=[dict(afi="ipv4", rule_sets=[dict(name="V4-INGRESS")])], state="deleted", ), ) commands = ["delete firewall ipv4 name V4-INGRESS"] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_del_02(self): """Test if plugin correctly removes existing rule sets, both ipv4 and ipv6 """ set_module_args( dict( config=[ dict(afi="ipv4", rule_sets=[dict(name="V4-INGRESS")]), dict(afi="ipv6", rule_sets=[dict(name="V6-INGRESS")]), ], state="deleted", ), ) commands = [ "delete firewall ipv4 name V4-INGRESS", "delete firewall ipv6 name V6-INGRESS", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_del_03(self): """Test that the plugin correctly deprovisions variant configuration """ set_module_args(dict(config=[], state="deleted")) commands = ["delete firewall ipv4", "delete firewall ipv6"] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_del_04(self): """Test if plugin has no effect on non-existent rule sets """ set_module_args( dict( config=[ dict(afi="ipv4", rule_sets=[dict(name="V4-ING")]), dict(afi="ipv6", rule_sets=[dict(name="V6-ING")]), ], state="deleted", ), ) self.execute_module(changed=False, commands=[]) def test_vyos_firewall_v4v6_rule_sets_rule_rep_01(self): """Test if plugin correctly replaces a particular rule set(s) without affecting the others """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="reject", description="Rule 101 is configured by Ansible RM", ipsec="match-ipsec", protocol="tcp", fragment="match-frag", disable=False, ), dict( number="102", action="accept", description="Rule 102 is configured by Ansible RM", protocol="icmp", disable=True, ), ], ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="V6-INGRESS", default_action="accept", description="This rule-set is configured by Ansible RM", ), dict( name="EGRESS", default_action="reject", description="This rule-set is configured by Ansible RM", rules=[ dict( icmp=dict(type_name="echo-request"), number=20, ), ], ), ], ), ], state="replaced", ), ) commands = [ "delete firewall ipv4 name V4-INGRESS rule 101", "set firewall ipv4 name V4-INGRESS rule 101", "set firewall ipv4 name V4-INGRESS description 'This is IPv4 INGRESS rule set'", "set firewall ipv4 name V4-INGRESS rule 101 fragment 'match-frag'", "set firewall ipv4 name V4-INGRESS rule 101 ipsec 'match-ipsec'", "set firewall ipv4 name V4-INGRESS rule 101 protocol 'tcp'", "set firewall ipv4 name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible RM'", "set firewall ipv4 name V4-INGRESS rule 101 action 'reject'", "set firewall ipv4 name V4-INGRESS rule 102 disable", "set firewall ipv4 name V4-INGRESS rule 102 action 'accept'", "set firewall ipv4 name V4-INGRESS rule 102 protocol 'icmp'", "set firewall ipv4 name V4-INGRESS rule 102 description 'Rule 102 is configured by Ansible RM'", "set firewall ipv4 name V4-INGRESS rule 102", "set firewall ipv6 name V6-INGRESS description 'This rule-set is configured by Ansible RM'", "set firewall ipv6 name EGRESS description 'This rule-set is configured by Ansible RM'", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_rule_rep_02(self): """Test if plugin correctly replaces a particular rule(s) and rule set attribute(s) without affecting the others """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 V4-INGRESS rule set", default_action="accept", enable_default_log=False, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", protocol="icmp", fragment="match-frag", disable=True, ), ], ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="V6-INGRESS", default_action="accept", ), dict( name="EGRESS", default_action="reject", rules=[ dict( icmp=dict(type_name="echo-request"), number=20, ), ], ), ], ), ], state="replaced", ), ) commands = [ "delete firewall ipv4 name V4-INGRESS rule 101", "delete firewall ipv4 name V4-INGRESS default-log", "set firewall ipv4 name V4-INGRESS rule 101", "set firewall ipv4 name V4-INGRESS rule 101 action 'accept'", "set firewall ipv4 name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible'", "set firewall ipv4 name V4-INGRESS rule 101 disable", "set firewall ipv4 name V4-INGRESS rule 101 fragment 'match-frag'", "set firewall ipv4 name V4-INGRESS rule 101 ipsec 'match-ipsec'", "set firewall ipv4 name V4-INGRESS rule 101 protocol 'icmp'", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_rule_rep_idem_01(self): """Test if plugin correctly has no effect if there is no change in the configuration """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 V4-INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", packet_length_exclude=[dict(length=100), dict(length=300)], protocol="icmp", disable=True, log="enable", ) ], ), dict( filter="input", rules=[ dict( number="1", action="jump", jump_target="INGRESS", ), ], ), dict( filter="output", rules=[ dict( number="1", action="jump", jump_target="EGRESS", ), ], ), dict( name="IF-TEST", rules=[ dict( number="10", action="accept", icmp=dict(type_name="echo-request"), state=dict(related=True), inbound_interface=dict(name="eth0"), outbound_interface=dict(group="the-ethers"), disable=True, ) ], ), dict( name="EGRESS", default_action="reject", ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="V6-INGRESS", default_action="accept", ), dict( name="EGRESS", default_action="reject", rules=[ dict( icmp=dict(type_name="echo-request"), number=20, ), ], ), dict( filter="input", rules=[ dict( number="1", action="jump", jump_target="V6-INGRESS", ), ], ), dict( filter="output", rules=[ dict( number="1", action="jump", jump_target="EGRESS", ), ], ), ], ), ], state="replaced", ), ) self.execute_module(changed=False, commands=[]) def test_vyos_firewall_v4v6_rule_sets_rule_rep_idem_02(self): """Test if plugin correctly has no effect if there is no change in the configuration """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 V4-INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", packet_length_exclude=[dict(length=100), dict(length=300)], protocol="icmp", disable=True, log="enable", ) ], ), ], ), ], state="replaced", ), ) self.execute_module(changed=False, commands=[]) def test_vyos_firewall_v4v6_rule_sets_rule_mer_idem_01(self): """Test if plugin correctly has no effect if there is no change in the configuration """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 V4-INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", packet_length_exclude=[dict(length=100), dict(length=300)], protocol="icmp", disable=True, log="enable", ) ], ), dict( filter="input", rules=[ dict( number="1", action="jump", jump_target="INGRESS", ), ], ), dict( filter="output", rules=[ dict( number="1", action="jump", jump_target="EGRESS", ), ], ), dict( name="IF-TEST", rules=[ dict( number="10", action="accept", icmp=dict(type_name="echo-request"), state=dict(related=True), inbound_interface=dict(name="eth0"), outbound_interface=dict(group="the-ethers"), disable=True, ) ], ), dict( name="EGRESS", default_action="reject", ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="V6-INGRESS", default_action="accept", ), dict( name="EGRESS", default_action="reject", rules=[ dict( icmp=dict(type_name="echo-request"), number=20, ), ], ), dict( filter="input", rules=[ dict( number="1", action="jump", jump_target="V6-INGRESS", ), ], ), dict( filter="output", rules=[ dict( number="1", action="jump", jump_target="EGRESS", ), ], ), ], ), ], state="merged", ), ) self.execute_module(changed=False, commands=[]) def test_vyos_firewall_v4v6_rule_sets_rule_ovr_01(self): """Test if plugin correctly resets the entire rule set if there is a change in the configuration """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-IN", description="This is IPv4 INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="1", action="reject", description="Rule 1 is configured by Ansible RM", ipsec="match-ipsec", log="enable", protocol="tcp", fragment="match-frag", disable=False, source=dict( group=dict( address_group="IN-ADDR-GROUP", network_group="IN-NET-GROUP", port_group="IN-PORT-GROUP", ), ), ), dict( number="2", action="accept", description="Rule 102 is configured by Ansible RM", protocol="icmp", disable=True, ), ], ), + dict( + name="MULTIPLE-RULE", + default_action="drop", + rules=[ + dict( + number="1", + action="accept", + protocol="all", + ), + ], + ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="V6-IN", default_action="accept", description="This rule-set is configured by Ansible RM", ), dict( name="V6-EG", default_action="reject", description="This rule-set is configured by Ansible RM", ), ], ), ], state="overridden", ), ) commands = [ "delete firewall ipv6 name V6-INGRESS", "delete firewall ipv6 name EGRESS", "delete firewall ipv4 name V4-INGRESS", "delete firewall ipv4 name EGRESS", + "delete firewall ipv4 name MULTIPLE-RULE", "delete firewall ipv4 input filter", "delete firewall ipv4 output filter", "delete firewall ipv6 input filter", "delete firewall ipv6 output filter", "delete firewall ipv4 name IF-TEST", + "set firewall ipv4 name MULTIPLE-RULE default-action 'drop'", + "set firewall ipv4 name MULTIPLE-RULE rule 1", + "set firewall ipv4 name MULTIPLE-RULE rule 1 action 'accept'", + "set firewall ipv4 name MULTIPLE-RULE rule 1 protocol 'all'", "set firewall ipv4 name V4-IN default-action 'accept'", "set firewall ipv4 name V4-IN description 'This is IPv4 INGRESS rule set'", "set firewall ipv4 name V4-IN default-log", "set firewall ipv4 name V4-IN rule 1 protocol 'tcp'", "set firewall ipv4 name V4-IN rule 1 log", "set firewall ipv4 name V4-IN rule 1 description 'Rule 1 is configured by Ansible RM'", "set firewall ipv4 name V4-IN rule 1 fragment 'match-frag'", "set firewall ipv4 name V4-IN rule 1 source group address-group IN-ADDR-GROUP", "set firewall ipv4 name V4-IN rule 1 source group network-group IN-NET-GROUP", "set firewall ipv4 name V4-IN rule 1 source group port-group IN-PORT-GROUP", "set firewall ipv4 name V4-IN rule 1", "set firewall ipv4 name V4-IN rule 1 action 'reject'", "set firewall ipv4 name V4-IN rule 1 ipsec 'match-ipsec'", "set firewall ipv4 name V4-IN rule 2 disable", "set firewall ipv4 name V4-IN rule 2 action 'accept'", "set firewall ipv4 name V4-IN rule 2 protocol 'icmp'", "set firewall ipv4 name V4-IN rule 2 description 'Rule 102 is configured by Ansible RM'", "set firewall ipv4 name V4-IN rule 2", "set firewall ipv6 name V6-IN default-action 'accept'", "set firewall ipv6 name V6-IN description 'This rule-set is configured by Ansible RM'", "set firewall ipv6 name V6-EG default-action 'reject'", "set firewall ipv6 name V6-EG description 'This rule-set is configured by Ansible RM'", ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_rule_ovr_02(self): """Test that the plugin correctly resets the entire rule sets configuration if changes are detected """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", protocol="udp", ), ], ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="EGRESS", default_action="reject", description="This rule-set is configured by Ansible RM", rules=[ dict( number="20", action="accept", protocol="udp", ), ], ), ], ), ], state="overridden", ), ) commands = [ "delete firewall ipv6 name V6-INGRESS", "delete firewall ipv6 name EGRESS", "delete firewall ipv4 name V4-INGRESS", "delete firewall ipv4 name EGRESS", "delete firewall ipv4 input filter", "delete firewall ipv4 output filter", "delete firewall ipv6 input filter", "delete firewall ipv6 output filter", "delete firewall ipv4 name IF-TEST", + "delete firewall ipv4 name MULTIPLE-RULE", "set firewall ipv4 name V4-INGRESS rule 101", "set firewall ipv4 name V4-INGRESS default-log", "set firewall ipv4 name V4-INGRESS description 'This is IPv4 INGRESS rule set'", "set firewall ipv4 name V4-INGRESS default-action 'accept'", "set firewall ipv4 name V4-INGRESS rule 101 protocol 'udp'", "set firewall ipv4 name V4-INGRESS rule 101 action 'accept'", "set firewall ipv6 name EGRESS description 'This rule-set is configured by Ansible RM'", "set firewall ipv6 name EGRESS default-action 'reject'", "set firewall ipv6 name EGRESS rule 20", "set firewall ipv6 name EGRESS rule 20 protocol 'udp'", "set firewall ipv6 name EGRESS rule 20 action 'accept'" ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_v4v6_rule_sets_rule_ovr_idem_01(self): """Test that the plugin is idempotent in overridden state if there are no changes to the rule sets """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 V4-INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", packet_length_exclude=[dict(length=100), dict(length=300)], protocol="icmp", disable=True, log="enable", ) ], ), dict( filter="input", rules=[ dict( number="1", action="jump", jump_target="INGRESS", ), ], ), dict( filter="output", rules=[ dict( number="1", action="jump", jump_target="EGRESS", ), ], ), dict( name="IF-TEST", rules=[ dict( number="10", action="accept", icmp=dict(type_name="echo-request"), state=dict(related=True), inbound_interface=dict(name="eth0"), outbound_interface=dict(group="the-ethers"), disable=True, ) ], ), dict( name="EGRESS", default_action="reject", ), + dict( + name="MULTIPLE-RULE", + default_action="drop", + rules=[ + dict( + number="1", + action="accept", + protocol="all", + ), + dict( + number="2", + action="drop", + protocol="all", + ), + ], + ), ], ), dict( afi="ipv6", rule_sets=[ dict( name="V6-INGRESS", default_action="accept", ), dict( name="EGRESS", default_action="reject", rules=[ dict( icmp=dict(type_name="echo-request"), number=20, ), ], ), dict( filter="input", rules=[ dict( number="1", action="jump", jump_target="V6-INGRESS", ), ], ), dict( filter="output", rules=[ dict( number="1", action="jump", jump_target="EGRESS", ), ], ), ], ), ], state="overridden", ), ) self.execute_module(changed=False, commands=[]) def test_vyos_firewall_v6_rule_sets_rule_merged_01_version(self): """Test if plugin correctly adds ipv6 rule set with rules """ set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", description="This is IPv6 INBOUND rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", protocol="icmp", disable=True, icmp=dict(type_name="echo-request"), log="enable", ), dict( number="102", action="reject", description="Rule 102 is configured by Ansible", protocol="ipv6-icmp", icmp=dict(type=7), ), ], ), ], ), ], state="merged", ), ) commands = [ "set firewall ipv6 name INBOUND default-action 'accept'", "set firewall ipv6 name INBOUND description 'This is IPv6 INBOUND rule set'", "set firewall ipv6 name INBOUND default-log", "set firewall ipv6 name INBOUND rule 101 protocol 'icmp'", "set firewall ipv6 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'", "set firewall ipv6 name INBOUND rule 101", "set firewall ipv6 name INBOUND rule 101 disable", "set firewall ipv6 name INBOUND rule 101 action 'accept'", "set firewall ipv6 name INBOUND rule 101 ipsec 'match-ipsec'", "set firewall ipv6 name INBOUND rule 101 icmpv6 type-name echo-request", "set firewall ipv6 name INBOUND rule 101 log", "set firewall ipv6 name INBOUND rule 102", "set firewall ipv6 name INBOUND rule 102 action 'reject'", "set firewall ipv6 name INBOUND rule 102 description 'Rule 102 is configured by Ansible'", "set firewall ipv6 name INBOUND rule 102 protocol 'ipv6-icmp'", 'set firewall ipv6 name INBOUND rule 102 icmpv6 type 7', ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_jump_rules_merged_01(self): """Test if plugin correctly adds rule set with a jump action """ set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", description="This is IPv6 INBOUND rule set with a jump action", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="jump", description="Rule 101 is configured by Ansible", ipsec="match-ipsec", protocol="icmp", icmp=dict(type_name="echo-request"), jump_target="PROTECT-RE", packet_length_exclude=[dict(length=100), dict(length=200)] ), dict( number="102", action="reject", description="Rule 102 is configured by Ansible", protocol="ipv6-icmp", icmp=dict(type=7), ), ], ), ], ) ], state="merged", ) ) commands = [ "set firewall ipv6 name INBOUND default-action 'accept'", "set firewall ipv6 name INBOUND description 'This is IPv6 INBOUND rule set with a jump action'", "set firewall ipv6 name INBOUND default-log", "set firewall ipv6 name INBOUND rule 101 protocol 'icmp'", "set firewall ipv6 name INBOUND rule 101 packet-length-exclude 100", "set firewall ipv6 name INBOUND rule 101 packet-length-exclude 200", "set firewall ipv6 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'", "set firewall ipv6 name INBOUND rule 101", "set firewall ipv6 name INBOUND rule 101 ipsec 'match-ipsec'", "set firewall ipv6 name INBOUND rule 101 icmpv6 type-name echo-request", "set firewall ipv6 name INBOUND rule 101 action 'jump'", "set firewall ipv6 name INBOUND rule 101 jump-target 'PROTECT-RE'", "set firewall ipv6 name INBOUND rule 102", "set firewall ipv6 name INBOUND rule 102 action 'reject'", "set firewall ipv6 name INBOUND rule 102 description 'Rule 102 is configured by Ansible'", "set firewall ipv6 name INBOUND rule 102 protocol 'ipv6-icmp'", 'set firewall ipv6 name INBOUND rule 102 icmpv6 type 7', ] self.execute_module(changed=True, commands=commands) def test_vyos_firewall_log_merged_01(self): """Test if new stanza log is correctly applied""" set_module_args( dict( config=[ dict( afi="ipv6", rule_sets=[ dict( name="INBOUND", description="This is IPv6 INBOUND rule set with a log", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", log="enable", ), ], ), ], ) ], state="merged", ) ) commands = [ "set firewall ipv6 name INBOUND default-action 'accept'", "set firewall ipv6 name INBOUND description 'This is IPv6 INBOUND rule set with a log'", "set firewall ipv6 name INBOUND default-log", "set firewall ipv6 name INBOUND rule 101 log", "set firewall ipv6 name INBOUND rule 101 description 'Rule 101 is configured by Ansible'", "set firewall ipv6 name INBOUND rule 101", "set firewall ipv6 name INBOUND rule 101 action 'accept'", ] self.maxDiff = None self.execute_module(changed=True, commands=commands) def test_vyos_firewall_log_replace_01(self): """Test that stanza is correctly replaced without touching the other stanzas """ set_module_args( dict( config=[ dict( afi="ipv4", rule_sets=[ dict( name="V4-INGRESS", description="This is IPv4 V4-INGRESS rule set", default_action="accept", enable_default_log=True, rules=[ dict( number="101", action="accept", description="Rule 101 is configured by Ansible", packet_length_exclude=[dict(length=100), dict(length=200)], packet_length=[dict(length=22)], log="enable", ), ], ), ], ) ], state="replaced", ) ) commands = [ "delete firewall ipv4 name V4-INGRESS rule 101", "set firewall ipv4 name V4-INGRESS rule 101", "set firewall ipv4 name V4-INGRESS rule 101 action 'accept'", "set firewall ipv4 name V4-INGRESS rule 101 description 'Rule 101 is configured by Ansible'", "set firewall ipv4 name V4-INGRESS rule 101 packet-length-exclude 100", "set firewall ipv4 name V4-INGRESS rule 101 packet-length-exclude 200", "set firewall ipv4 name V4-INGRESS rule 101 packet-length 22", "set firewall ipv4 name V4-INGRESS rule 101 log", ] self.maxDiff = None self.execute_module(changed=True, commands=commands)