diff --git a/changelogs/fragments/T7284-delete_firewall_description.yml b/changelogs/fragments/T7284-delete_firewall_description.yml
new file mode 100644
index 00000000..fe2b1882
--- /dev/null
+++ b/changelogs/fragments/T7284-delete_firewall_description.yml
@@ -0,0 +1,2 @@
+bugfixes:
+ - vyos_firewall_rules - Allow deleting of firewall description.
diff --git a/plugins/module_utils/network/vyos/config/firewall_global/firewall_global.py b/plugins/module_utils/network/vyos/config/firewall_global/firewall_global.py
index 34dc0ed6..e2a25e32 100644
--- a/plugins/module_utils/network/vyos/config/firewall_global/firewall_global.py
+++ b/plugins/module_utils/network/vyos/config/firewall_global/firewall_global.py
@@ -1,770 +1,767 @@
#
# -*- coding: utf-8 -*-
# Copyright 2019 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""
The vyos_firewall_global class
It is in this file where the current configuration (as dict)
is compared to the provided configuration (as dict) and the command set
necessary to bring the current configuration to it's desired end-state is
created
"""
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from copy import deepcopy
from ansible.module_utils.six import iteritems
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base import (
ConfigBase,
)
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import (
remove_empties,
to_list,
)
from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.facts import Facts
from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.utils.utils import (
list_diff_want_only,
+ in_target_not_none,
)
from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.vyos import get_os_version
from ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.utils.version import LooseVersion
class Firewall_global(ConfigBase):
"""
The vyos_firewall_global class
"""
gather_subset = ["!all", "!min"]
gather_network_resources = ["firewall_global"]
def __init__(self, module):
super(Firewall_global, self).__init__(module)
def get_firewall_global_facts(self, data=None):
"""Get the 'facts' (the current configuration)
:rtype: A dictionary
:returns: The current configuration as a dictionary
"""
facts, _warnings = Facts(self._module).get_facts(
self.gather_subset,
self.gather_network_resources,
data=data,
)
firewall_global_facts = facts["ansible_network_resources"].get("firewall_global")
if not firewall_global_facts:
return []
return firewall_global_facts
def execute_module(self):
"""Execute the module
:rtype: A dictionary
:returns: The result from module execution
"""
result = {"changed": False}
warnings = list()
commands = list()
if self.state in self.ACTION_STATES:
existing_firewall_global_facts = self.get_firewall_global_facts()
else:
existing_firewall_global_facts = []
if self.state in self.ACTION_STATES or self.state == "rendered":
commands.extend(self.set_config(existing_firewall_global_facts))
if commands and self.state in self.ACTION_STATES:
if not self._module.check_mode:
self._connection.edit_config(commands)
result["changed"] = True
if self.state in self.ACTION_STATES:
result["commands"] = commands
if self.state in self.ACTION_STATES or self.state == "gathered":
changed_firewall_global_facts = self.get_firewall_global_facts()
elif self.state == "rendered":
result["rendered"] = commands
elif self.state == "parsed":
running_config = self._module.params["running_config"]
if not running_config:
self._module.fail_json(
msg="value of running_config parameter must not be empty for state parsed",
)
result["parsed"] = self.get_firewall_global_facts(data=running_config)
else:
changed_firewall_global_facts = []
if self.state in self.ACTION_STATES:
result["before"] = existing_firewall_global_facts
if result["changed"]:
result["after"] = changed_firewall_global_facts
elif self.state == "gathered":
result["gathered"] = changed_firewall_global_facts
result["warnings"] = warnings
return result
def set_config(self, existing_firewall_global_facts):
"""Collect the configuration from the args passed to the module,
collect the current configuration (as a dict from facts)
:rtype: A list
:returns: the commands necessary to migrate the current configuration
to the desired configuration
"""
want = self._module.params["config"]
have = existing_firewall_global_facts
resp = self.set_state(want, have)
return to_list(resp)
def set_state(self, w, h):
"""Select the appropriate function based on the state provided
:param want: the desired configuration as a dictionary
:param have: the current configuration as a dictionary
:rtype: A list
:returns: the commands necessary to migrate the current configuration
to the desired configuration
"""
commands = []
if self.state in ("merged", "replaced", "rendered") and not w:
self._module.fail_json(
msg="value of config parameter must not be empty for state {0}".format(self.state),
)
if self.state == "deleted":
commands.extend(self._state_deleted(want=None, have=h))
elif w:
if self.state == "merged" or self.state == "rendered":
commands.extend(self._state_merged(w, h))
elif self.state == "replaced":
commands.extend(self._state_replaced(w, h))
return commands
def _state_replaced(self, w, h):
"""The command generator when state is replaced
:rtype: A list
:returns: the commands necessary to migrate the current configuration
to the desired configuration
"""
commands = []
if h:
commands.extend(self._state_deleted(h, w))
commands.extend(self._state_merged(w, h))
return commands
def _state_merged(self, want, have):
"""The command generator when state is merged
:rtype: A list
:returns: the commands necessary to merge the provided into
the current configuration
"""
commands = []
commands.extend(self._add_global_attr(want, have))
return commands
def _state_deleted(self, want, have):
"""The command generator when state is deleted
:rtype: A list
:returns: the commands necessary to remove the current configuration
of the provided objects
"""
commands = []
b_set = (
"config_trap",
"validation",
"log_martians",
"syn_cookies",
"twa_hazards_protection",
)
if want:
for key, val in iteritems(want):
if val and key in b_set and not have:
commands.append(self._form_attr_cmd(attr=key, opr=False))
elif val and key in b_set and have and key in have and have[key] != val:
commands.append(self._form_attr_cmd(attr=key, opr=False))
else:
commands.extend(self._render_attr_config(want, have, key))
elif not want and have:
commands.append(self._compute_command(opr=False))
elif have:
for key, val in iteritems(have):
if val and key in b_set:
commands.append(self._form_attr_cmd(attr=key, opr=False))
else:
commands.extend(self._render_attr_config(want, have, key))
return commands
def _render_attr_config(self, w, h, key, opr=False):
"""
This function invoke the function to extend commands
based on the key.
:param w: the desired configuration.
:param h: the current configuration.
:param key: attribute name
:param opr: operation
:return: list of commands
"""
commands = []
if key == "ping":
commands.extend(self._render_ping(key, w, h, opr=opr))
elif key == "group":
commands.extend(self._render_group(key, w, h, opr=opr))
elif key == "state_policy":
commands.extend(self._render_state_policy(key, w, h, opr=opr))
elif key == "route_redirects":
commands.extend(self._render_route_redirects(key, w, h, opr=opr))
return commands
def _add_global_attr(self, w, h, opr=True):
"""
This function forms the set/delete commands based on the 'opr' type
for firewall_global attributes.
:param w: the desired config.
:param h: the target config.
:param opr: True/False.
:return: generated commands list.
"""
commands = []
w_fg = deepcopy(remove_empties(w))
l_set = (
"config_trap",
"validation",
"log_martians",
"syn_cookies",
"twa_hazards_protection",
)
if w_fg:
for key, val in iteritems(w_fg):
if opr and key in l_set and not (h and self._is_w_same(w_fg, h, key)):
commands.append(
self._form_attr_cmd(attr=key, val=self._bool_to_str(val), opr=opr),
)
elif not opr:
if key and self._is_del(l_set, h):
commands.append(
self._form_attr_cmd(attr=key, key=self._bool_to_str(val), opr=opr),
)
continue
if (
key in l_set
and not self._in_target(h, key)
and not self._is_del(l_set, h)
):
commands.append(
self._form_attr_cmd(attr=key, val=self._bool_to_str(val), opr=opr),
)
else:
commands.extend(self._render_attr_config(w_fg, h, key, opr))
return commands
def _render_ping(self, attr, w, h, opr):
"""
This function forms the commands for 'ping' attributes based on the 'opr'.
:param attr: attribute name.
:param w: the desired configuration.
:param h: the target config.
:param opr: True/False.
:return: generated list of commands.
"""
commands = []
h_ping = {}
l_set = ("all", "broadcast")
if h:
h_ping = h.get(attr) or {}
if self._is_root_del(w[attr], h_ping, attr):
for item, value in iteritems(h[attr]):
if not opr and item in l_set:
commands.append(self._form_attr_cmd(attr=item, opr=opr))
elif w[attr]:
if h and attr in h.keys():
h_ping = h.get(attr) or {}
for item, value in iteritems(w[attr]):
if (
opr
and item in l_set
and not (h_ping and self._is_w_same(w[attr], h_ping, item))
):
commands.append(
self._form_attr_cmd(attr=item, val=self._bool_to_str(value), opr=opr),
)
elif (
not opr
and item in l_set
and not (h_ping and self._is_w_same(w[attr], h_ping, item))
):
commands.append(self._form_attr_cmd(attr=item, opr=opr))
return commands
def _render_group(self, attr, w, h, opr):
"""
This function forms the commands for 'group' attribute based on the 'opr'.
:param attr: attribute name.
:param w: base config.
:param h: target config.
:param opr: True/False.
:return: generated list of commands.
"""
commands = []
h_grp = {}
if not opr and self._is_root_del(h, w, attr):
commands.append(self._form_attr_cmd(attr=attr, opr=opr))
else:
if h:
h_grp = h.get("group") or {}
if w:
commands.extend(self._render_grp_mem("port_group", w["group"], h_grp, opr))
commands.extend(self._render_grp_mem("address_group", w["group"], h_grp, opr))
commands.extend(self._render_grp_mem("network_group", w["group"], h_grp, opr))
return commands
def _render_grp_mem(self, attr, w, h, opr):
"""
This function forms the commands for group list/members attributes based on the 'opr'.
:param attr: attribute name.
:param w: the desired config.
:param h: the target config.
:param opr: True/False.
:return: generated list of commands.
"""
commands = []
h_grp = []
w_grp = []
l_set = ("name", "description")
if w:
w_grp = w.get(attr) or []
if h:
h_grp = h.get(attr) or []
if w_grp:
for want in w_grp:
h = self.search_attrib_in_have(h_grp, want, "name")
if "afi" in want and want["afi"] == "ipv6":
cmd = self._compute_command(key="group", attr="ipv6-" + attr, opr=opr)
else:
cmd = self._compute_command(key="group", attr=attr, opr=opr)
for key, val in iteritems(want):
if val:
if opr and key in l_set and not (h and self._is_w_same(want, h, key)):
if key == "name":
commands.append(cmd + " " + str(val))
else:
commands.append(
cmd
+ " "
+ want["name"]
+ " "
+ key
+ " '"
+ str(want[key])
+ "'",
)
elif not opr and key in l_set:
if key == "name" and self._is_grp_del(h, want, key):
commands.append(cmd + " " + want["name"])
continue
- if not (h and self._in_target(h, key)) and not self._is_grp_del(
- h,
- want,
- key,
- ):
+ if not (h and in_target_not_none(h, key)) and not self._is_grp_del(h, want, "name"):
commands.append(cmd + " " + want["name"] + " " + key)
elif key == "members":
commands.extend(
self._render_ports_addrs(
key,
want,
h,
opr,
cmd,
want["name"],
attr,
),
)
return commands
def _render_ports_addrs(self, attr, w, h, opr, cmd, name, type):
"""
This function forms the commands for port/address/network group members
based on the 'opr'.
:param attr: attribute name.
:param w: the desired config.
:param h: the target config.
:param cmd: commands to be prepend.
:param name: name of group.
:param type: group type.
:return: generated list of commands.
"""
commands = []
have = []
if w:
want = w.get(attr) or []
if h:
have = h.get(attr) or []
if want:
if opr:
members = list_diff_want_only(want, have)
for member in members:
commands.append(
cmd
+ " "
+ name
+ " "
+ self._grp_type(type)
+ " "
+ member[self._get_mem_type(type)],
)
elif not opr and have:
members = list_diff_want_only(want, have)
for member in members:
commands.append(
cmd
+ " "
+ name
+ " "
+ self._grp_type(type)
+ " "
+ member[self._get_mem_type(type)],
)
return commands
def _get_mem_type(self, group):
"""
This function returns the member type
based on the type of group.
"""
return "port" if group == "port_group" else "address"
def _render_state_policy(self, attr, w, h, opr):
"""
This function forms the commands for 'state-policy' attributes
based on the 'opr'.
:param attr: attribute name.
:param w: the desired config.
:param h: the target config.
:param opr: True/False.
:return: generated list of commands.
"""
commands = []
have = []
if LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"):
l_set = ("log", "action", "connection_type", "log_level")
else:
l_set = ("log", "action", "connection_type")
if not opr and self._is_root_del(h, w, attr):
commands.append(self._form_attr_cmd(attr=attr, opr=opr))
else:
w_sp = deepcopy(remove_empties(w))
want = w_sp.get(attr) or []
if h:
have = h.get(attr) or []
if want:
for w in want:
h = self.search_attrib_in_have(have, w, "connection_type")
for key, val in iteritems(w):
if val and key != "connection_type":
if opr and key in l_set and not (h and self._is_w_same(w, h, key)):
if key == "log" and LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"):
commands.append(
self._form_attr_cmd(
key=attr + " " + w["connection_type"],
attr=key,
opr=opr,
),
)
else:
commands.append(
self._form_attr_cmd(
key=attr + " " + w["connection_type"],
attr=key,
val=self._bool_to_str(val),
opr=opr,
),
)
elif not opr and key in l_set:
if not h:
commands.append(
self._form_attr_cmd(
attr=attr + " " + w["connection_type"],
opr=opr,
),
)
break # delete the whole thing and move on
if (not self._in_target(h, key) or h[key] is None) and (self._in_target(w, key) and w[key]):
# delete if not being replaced and value currently exists
commands.append(
self._form_attr_cmd(
attr=attr + " " + w["connection_type"] + " " + key,
val=self._bool_to_str(val),
opr=opr,
),
)
return commands
def _render_route_redirects(self, attr, w, h, opr):
"""
This function forms the commands for 'route_redirects' attributes based on the 'opr'.
:param attr: attribute name.
:param w: the desired config.
:param h: the target config.
:param opr: True/False.
:return: generated list of commands.
"""
commands = []
have = []
l_set = ("afi", "ip_src_route")
if w:
want = w.get(attr) or []
if h:
have = h.get(attr) or []
if want:
for w in want:
h = self.search_attrib_in_have(have, w, "afi")
if 'afi' in w:
afi = w['afi']
else:
if h and 'afi' in h:
afi = h['afi']
else:
afi = None
afi = None
for key, val in iteritems(w):
if val and key != "afi":
if opr and key in l_set and not (h and self._is_w_same(w, h, key)):
commands.append(
self._form_attr_cmd(
attr=key,
val=self._bool_to_str(val),
opr=opr,
type=afi
),
)
elif not opr and key in l_set:
if self._is_del(l_set, h):
commands.append(
self._form_attr_cmd(
attr=key,
val=self._bool_to_str(val),
opr=opr,
type=afi
),
)
continue
if not (h and self._in_target(h, key)) and not self._is_del(l_set, h):
commands.append(
self._form_attr_cmd(
attr=key,
val=self._bool_to_str(val),
opr=opr,
type=afi
),
)
elif key == "icmp_redirects":
commands.extend(self._render_icmp_redirects(key, w, h, opr))
return commands
def _render_icmp_redirects(self, attr, w, h, opr):
"""
This function forms the commands for 'icmp_redirects' attributes
based on the 'opr'.
:param attr: attribute name.
:param w: the desired config.
:param h: the target config.
:param opr: True/False.
:return: generated list of commands.
"""
commands = []
h_red = {}
l_set = ("send", "receive")
if w and 'afi' in w:
afi = w['afi']
else:
if h and 'afi' in h:
afi = h['afi']
else:
afi = None
if w[attr]:
if h and attr in h.keys():
h_red = h.get(attr) or {}
for item, value in iteritems(w[attr]):
if opr and item in l_set and not (h_red and self._is_w_same(w[attr], h_red, item)):
commands.append(
self._form_attr_cmd(attr=item, val=self._bool_to_str(value), opr=opr, type=afi)
)
elif (
not opr
and item in l_set
and not (h_red and self._is_w_same(w[attr], h_red, item))
):
commands.append(self._form_attr_cmd(attr=item, opr=opr, type=afi))
return commands
def search_attrib_in_have(self, have, want, attr):
"""
This function returns the attribute if it is present in target config.
:param have: the target config.
:param want: the desired config.
:param attr: attribute name .
:return: attribute/None
"""
if have:
for h in have:
if h[attr] == want[attr]:
return h
return None
def _form_attr_cmd(self, key=None, attr=None, val=None, opr=True, type=None):
"""
This function forms the command for leaf attribute.
:param key: parent key.
:param attr: attribute name
:param value: value
:param opr: True/False.
:param type: AF type of attribute.
:return: generated command.
"""
command = self._compute_command(key=key, attr=self._map_attrib(attr, type=type), val=val, opr=opr)
return command
def _compute_command(self, key=None, attr=None, val=None, remove=False, opr=True):
"""
This function construct the add/delete command based on passed attributes.
:param key: parent key.
:param attr: attribute name
:param value: value
:param remove: True/False.
:param opr: True/False.
:return: generated command.
"""
if remove or not opr:
cmd = "delete firewall "
else:
cmd = "set firewall "
if attr and key != "group" and LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"):
cmd += "global-options "
if key:
cmd += key.replace("_", "-") + " "
if attr:
cmd += attr.replace("_", "-")
if val and opr:
if key == "state_policy" and LooseVersion(get_os_version(self._module)) >= LooseVersion("1.4"):
cmd += ""
else:
cmd += " '" + str(val) + "'"
return cmd.strip()
def _bool_to_str(self, val):
"""
This function converts the bool value into string.
:param val: bool value.
:return: enable/disable.
"""
return "enable" if str(val) == "True" else "disable" if str(val) == "False" else val
def _grp_type(self, val):
"""
This function returns the group member type based on value argument.
:param val: value.
:return: member type.
"""
return (
"address" if val == "address_group" else "network" if val == "network_group" else "port"
)
def _is_w_same(self, w, h, key):
"""
This function checks whether the key value is same in desired and
target config dictionary.
:param w: base config.
:param h: target config.
:param key:attribute name.
:return: True/False.
"""
return True if h and key in h and h[key] == w[key] else False
def _in_target(self, h, key):
"""
This function checks whether the target exist and key present in target config.
:param h: target config.
:param key: attribute name.
:return: True/False.
"""
return True if h and key in h else False
def _is_grp_del(self, w, h, key):
"""
This function checks whether group needed to be deleted based on
desired and target configs.
:param w: the desired config.
:param h: the target config.
:param key: group name.
:return: True/False.
"""
return True if h and key in h and (not w or key not in w or not w[key]) else False
def _is_root_del(self, w, h, key):
"""
This function checks whether a root attribute which can have
further child attributes needed to be deleted.
:param w: the desired config.
:param h: the target config.
:param key: attribute name.
:return: True/False.
"""
return True if h and key in h and (not w or key not in w or not w[key]) else False
def _is_del(self, b_set, h, key="number"):
"""
This function checks whether attribute needs to be deleted
when operation is false and attribute present in present target config.
:param b_set: attribute set.
:param h: target config.
:param key: number.
:return: True/False.
"""
return key in b_set and not self._in_target(h, key)
def _map_attrib(self, attrib, type=None):
"""
- This function construct the regex string.
- replace the underscore with hyphen.
:param attrib: attribute
:return: regex string
"""
regex = attrib.replace("_", "-")
if attrib == "send":
if type == "ipv6":
regex = "ipv6-send-redirects"
else:
regex = "send-redirects"
elif attrib == "ip_src_route":
if type == "ipv6":
regex = "ipv6-src-route"
elif attrib == "receive":
if type == "ipv6":
regex = "ipv6-receive-redirects"
else:
regex = "receive-redirects"
elif attrib == "disabled":
regex = "disable"
elif attrib == "all":
regex = "all-ping"
elif attrib == "broadcast":
regex = "broadcast-ping"
elif attrib == "validation":
regex = "source-validation"
return regex
diff --git a/plugins/module_utils/network/vyos/utils/utils.py b/plugins/module_utils/network/vyos/utils/utils.py
index a6b03c80..4c371962 100644
--- a/plugins/module_utils/network/vyos/utils/utils.py
+++ b/plugins/module_utils/network/vyos/utils/utils.py
@@ -1,278 +1,288 @@
# -*- coding: utf-8 -*-
# Copyright 2019 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# utils
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from ansible.module_utils.basic import missing_required_lib
from ansible.module_utils.six import iteritems
try:
import ipaddress
HAS_IPADDRESS = True
except ImportError:
HAS_IPADDRESS = False
def search_obj_in_list(name, lst, key="name"):
if lst:
for item in lst:
if item[key] == name:
return item
return None
def get_interface_type(interface):
"""Gets the type of interface"""
if interface.startswith("eth"):
return "ethernet"
elif interface.startswith("bond"):
return "bonding"
elif interface.startswith("vti"):
return "vti"
elif interface.startswith("lo"):
return "loopback"
elif interface.startswith("vtun"):
return "openvpn"
elif interface.startswith("wg"):
return "wireguard"
elif interface.startswith("tun"):
return "tunnel"
elif interface.startswith("br"):
return "bridge"
elif interface.startswith("dum"):
return "dummy"
def get_interface_with_vif(interface):
"""Gets virtual interface if any or return as is"""
vlan = None
interface_real = interface
if "." in interface:
interface_real, vlan = interface.split(".")
if vlan is not None:
interface_real = interface_real + " vif " + vlan
return interface_real
def dict_delete(base, comparable):
"""
This function generates a dict containing key, value pairs for keys
that are present in the `base` dict but not present in the `comparable`
dict.
:param base: dict object to base the diff on
:param comparable: dict object to compare against base
:returns: new dict object with key, value pairs that needs to be deleted.
"""
to_delete = dict()
for key in base:
if isinstance(base[key], dict):
sub_diff = dict_delete(base[key], comparable.get(key, {}))
if sub_diff:
to_delete[key] = sub_diff
else:
if key not in comparable:
to_delete[key] = base[key]
return to_delete
def diff_list_of_dicts(want, have):
diff = []
set_w = set(tuple(d.items()) for d in want)
set_h = set(tuple(d.items()) for d in have)
difference = set_w.difference(set_h)
for element in difference:
diff.append(dict((x, y) for x, y in element))
return diff
def get_lst_diff_for_dicts(want, have, lst):
"""
This function generates a list containing values
that are only in want and not in list in have dict
:param want: dict object to want
:param have: dict object to have
:param lst: list the diff on
:return: new list object with values which are only in want.
"""
if not have:
diff = want.get(lst) or []
else:
want_elements = want.get(lst) or {}
have_elements = have.get(lst) or {}
diff = list_diff_want_only(want_elements, have_elements)
return diff
def get_lst_same_for_dicts(want, have, lst):
"""
This function generates a list containing values
that are common for list in want and list in have dict
:param want: dict object to want
:param have: dict object to have
:param lst: list the comparison on
:return: new list object with values which are common in want and have.
"""
diff = None
if want and have:
want_list = want.get(lst) or {}
have_list = have.get(lst) or {}
diff = [i for i in want_list and have_list if i in have_list and i in want_list]
return diff
def list_diff_have_only(want_list, have_list):
"""
This function generated the list containing values
that are only in have list.
:param want_list:
:param have_list:
:return: new list with values which are only in have list
"""
if have_list and not want_list:
diff = have_list
elif not have_list:
diff = None
else:
diff = [i for i in have_list + want_list if i in have_list and i not in want_list]
return diff
def list_diff_want_only(want_list, have_list):
"""
This function generated the list containing values
that are only in want list.
:param want_list:
:param have_list:
:return: new list with values which are only in want list
"""
if have_list and not want_list:
diff = None
elif not have_list:
diff = want_list
else:
diff = [i for i in have_list + want_list if i in want_list and i not in have_list]
return diff
def search_dict_tv_in_list(d_val1, d_val2, lst, key1, key2):
"""
This function return the dict object if it exist in list.
:param d_val1:
:param d_val2:
:param lst:
:param key1:
:param key2:
:return:
"""
obj = next(
(item for item in lst if item[key1] == d_val1 and item[key2] == d_val2),
None,
)
if obj:
return obj
else:
return None
def key_value_in_dict(have_key, have_value, want_dict):
"""
This function checks whether the key and values exist in dict
:param have_key:
:param have_value:
:param want_dict:
:return:
"""
for key, value in iteritems(want_dict):
if key == have_key and value == have_value:
return True
return False
def is_dict_element_present(dict, key):
"""
This function checks whether the key is present in dict.
:param dict:
:param key:
:return:
"""
for item in dict:
if item == key:
return True
return False
def get_ip_address_version(address):
"""
This function returns the version of IP address
:param address: IP address
:return:
"""
if not HAS_IPADDRESS:
raise Exception(missing_required_lib("ipaddress"))
try:
address = unicode(address)
except NameError:
address = str(address)
version = ipaddress.ip_address(address.split("/")[0]).version
return version
def get_route_type(address):
"""
This function returns the route type based on IP address
:param address:
:return:
"""
version = get_ip_address_version(address)
if version == 6:
return "route6"
elif version == 4:
return "route"
def _bool_to_str(val):
"""
This function converts the bool value into string.
:param val: bool value.
:return: enable/disable.
"""
return "enable" if str(val) == "True" else "disable" if str(val) == "False" else val
def _is_w_same(w, h, key):
"""
This function checks whether the key value is same in desired and
target config dictionary.
:param w: base config.
:param h: target config.
:param key:attribute name.
:return: True/False.
"""
return True if h and key in h and h[key] == w[key] else False
def _in_target(h, key):
"""
This function checks whether the target exist and key present in target config.
:param h: target config.
:param key: attribute name.
:return: True/False.
"""
return True if h and key in h else False
+
+
+def in_target_not_none(h, key):
+ """
+ This function checks whether the target exist,key present in target config, and the value is not None.
+ :param h: target config.
+ :param key: attribute name.
+ :return: True/False.
+ """
+ return True if h and key in h and h[key] is not None else False
diff --git a/tests/unit/modules/network/vyos/test_vyos_firewall_global.py b/tests/unit/modules/network/vyos/test_vyos_firewall_global.py
index 2ecd0621..481cc1dd 100644
--- a/tests/unit/modules/network/vyos/test_vyos_firewall_global.py
+++ b/tests/unit/modules/network/vyos/test_vyos_firewall_global.py
@@ -1,454 +1,455 @@
# (c) 2016 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see .
# Make coding more python3-ish
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from unittest.mock import patch
from ansible_collections.vyos.vyos.plugins.modules import vyos_firewall_global
from ansible_collections.vyos.vyos.tests.unit.modules.utils import set_module_args
from .vyos_module import TestVyosModule, load_fixture
class TestVyosFirewallGlobalModule(TestVyosModule):
module = vyos_firewall_global
def setUp(self):
super(TestVyosFirewallGlobalModule, self).setUp()
self.mock_get_config = patch(
"ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.get_config",
)
self.get_config = self.mock_get_config.start()
self.mock_load_config = patch(
"ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.load_config",
)
self.load_config = self.mock_load_config.start()
self.mock_get_resource_connection_config = patch(
"ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base.get_resource_connection",
)
self.get_resource_connection_config = self.mock_get_resource_connection_config.start()
self.mock_get_resource_connection_facts = patch(
"ansible_collections.ansible.netcommon.plugins.module_utils.network.common.facts.facts.get_resource_connection",
)
self.get_resource_connection_facts = self.mock_get_resource_connection_facts.start()
self.mock_execute_show_command = patch(
"ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.firewall_global.firewall_global.Firewall_globalFacts.get_device_data",
)
self.mock_get_os_version = patch(
"ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.config.firewall_global.firewall_global.get_os_version",
)
self.get_os_version = self.mock_get_os_version.start()
self.get_os_version.return_value = "1.3"
self.execute_show_command = self.mock_execute_show_command.start()
self.maxDiff = None
def tearDown(self):
super(TestVyosFirewallGlobalModule, self).tearDown()
self.mock_get_resource_connection_config.stop()
self.mock_get_resource_connection_facts.stop()
self.mock_get_config.stop()
self.mock_load_config.stop()
self.mock_execute_show_command.stop()
self.mock_get_os_version.stop()
def load_fixtures(self, commands=None, filename=None):
def load_from_file(*args, **kwargs):
return load_fixture("vyos_firewall_global_config.cfg")
self.execute_show_command.side_effect = load_from_file
def test_vyos_firewall_global_set_01_merged(self):
set_module_args(
dict(
config=dict(
validation="strict",
config_trap=True,
log_martians=True,
syn_cookies=True,
twa_hazards_protection=True,
ping=dict(all=True, broadcast=True),
state_policy=[
dict(
connection_type="established",
action="accept",
log=True,
log_level="emerg",
),
dict(connection_type="invalid", action="reject"),
],
route_redirects=[
dict(
afi="ipv4",
ip_src_route=True,
icmp_redirects=dict(send=True, receive=False),
),
dict(
afi="ipv6",
ip_src_route=True,
icmp_redirects=dict(receive=False),
),
],
group=dict(
address_group=[
dict(
afi="ipv4",
name="MGMT-HOSTS",
description="This group has the Management hosts address lists",
members=[
dict(address="192.0.1.1"),
dict(address="192.0.1.3"),
dict(address="192.0.1.5"),
],
),
dict(
afi="ipv6",
name="GOOGLE-DNS-v6",
members=[
dict(address="2001:4860:4860::8888"),
dict(address="2001:4860:4860::8844"),
],
),
],
network_group=[
dict(
afi="ipv4",
name="MGMT",
description="This group has the Management network addresses",
members=[dict(address="192.0.1.0/24")],
),
dict(
afi="ipv6",
name="DOCUMENTATION-v6",
description="IPv6 Addresses reserved for documentation per RFC 3849",
members=[
dict(address="2001:0DB8::/32"),
dict(address="3FFF:FFFF::/32"),
],
),
],
port_group=[
dict(
name="TELNET",
description="This group has the telnet ports",
members=[dict(port="23")],
),
],
),
),
state="merged",
),
)
commands = [
"set firewall group address-group MGMT-HOSTS address 192.0.1.1",
"set firewall group address-group MGMT-HOSTS address 192.0.1.3",
"set firewall group address-group MGMT-HOSTS address 192.0.1.5",
"set firewall group address-group MGMT-HOSTS description 'This group has the Management hosts address lists'",
"set firewall group address-group MGMT-HOSTS",
"set firewall group ipv6-address-group GOOGLE-DNS-v6 address 2001:4860:4860::8888",
"set firewall group ipv6-address-group GOOGLE-DNS-v6 address 2001:4860:4860::8844",
"set firewall group ipv6-address-group GOOGLE-DNS-v6",
"set firewall group network-group MGMT network 192.0.1.0/24",
"set firewall group network-group MGMT description 'This group has the Management network addresses'",
"set firewall group network-group MGMT",
"set firewall group ipv6-network-group DOCUMENTATION-v6 network 2001:0DB8::/32",
"set firewall group ipv6-network-group DOCUMENTATION-v6 network 3FFF:FFFF::/32",
"set firewall group ipv6-network-group DOCUMENTATION-v6 description 'IPv6 Addresses reserved for documentation per RFC 3849'",
"set firewall group ipv6-network-group DOCUMENTATION-v6",
"set firewall group port-group TELNET port 23",
"set firewall group port-group TELNET description 'This group has the telnet ports'",
"set firewall group port-group TELNET",
"set firewall ip-src-route 'enable'",
"set firewall receive-redirects 'disable'",
"set firewall config-trap 'enable'",
"set firewall ipv6-receive-redirects 'disable'",
"set firewall state-policy established action 'accept'",
"set firewall state-policy established log 'enable'",
"set firewall state-policy invalid action 'reject'",
"set firewall broadcast-ping 'enable'",
"set firewall all-ping 'enable'",
"set firewall log-martians 'enable'",
"set firewall twa-hazards-protection 'enable'",
"set firewall syn-cookies 'enable'",
"set firewall source-validation 'strict'",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_global_set_01_merged_idem(self):
set_module_args(
dict(
config=dict(
group=dict(
address_group=[
dict(
afi="ipv4",
name="RND-HOSTS",
description="This group has the Management hosts address lists",
members=[
dict(address="192.0.2.1"),
dict(address="192.0.2.3"),
dict(address="192.0.2.5"),
],
),
dict(
afi="ipv6",
name="LOCAL-v6",
description="This group has the hosts address lists of this machine",
members=[
dict(address="::1"),
dict(address="fdec:2503:89d6:59b3::1"),
],
),
],
network_group=[
dict(
afi="ipv4",
name="RND",
description="This group has the Management network addresses",
members=[dict(address="192.0.2.0/24")],
),
dict(
afi="ipv6",
name="UNIQUE-LOCAL-v6",
description="This group encompasses the ULA address space in IPv6",
members=[dict(address="fc00::/7")],
),
],
port_group=[
dict(
name="SSH",
description="This group has the ssh ports",
members=[dict(port="22")],
),
],
),
),
state="merged",
),
)
self.execute_module(changed=False, commands=[])
def test_vyos_firewall_global_set_01_replaced(self):
set_module_args(
dict(
config=dict(
group=dict(
address_group=[
dict(
afi="ipv4",
name="RND-HOSTS",
description="This group has the Management hosts address lists",
members=[
dict(address="192.0.2.1"),
dict(address="192.0.2.7"),
dict(address="192.0.2.9"),
],
),
dict(
afi="ipv6",
name="LOCAL-v6",
description="This group has the hosts address lists of this machine",
members=[
dict(address="::1"),
dict(address="fdec:2503:89d6:59b3::2"),
],
),
],
network_group=[
dict(
afi="ipv4",
name="RND",
- description="This group has the Management network addresses",
+ # Deleted the description here.
members=[dict(address="192.0.2.0/24")],
),
dict(
afi="ipv6",
name="UNIQUE-LOCAL-v6",
description="This group encompasses the ULA address space in IPv6",
members=[dict(address="fc00::/7")],
),
],
port_group=[
dict(
name="SSH",
description="This group has the ssh ports",
members=[dict(port="2222")],
),
],
),
),
state="replaced",
),
)
commands = [
"delete firewall ipv6-src-route",
"delete firewall send-redirects",
"delete firewall group address-group RND-HOSTS address 192.0.2.3",
"delete firewall group address-group RND-HOSTS address 192.0.2.5",
"set firewall group address-group RND-HOSTS address 192.0.2.7",
"set firewall group address-group RND-HOSTS address 192.0.2.9",
+ "delete firewall group network-group RND description",
"delete firewall group ipv6-address-group LOCAL-v6 address fdec:2503:89d6:59b3::1",
"set firewall group ipv6-address-group LOCAL-v6 address fdec:2503:89d6:59b3::2",
"delete firewall group port-group SSH port 22",
"set firewall group port-group SSH port 2222",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_global_set_02_replaced(self):
set_module_args(
dict(
config=dict(
state_policy=[
dict(connection_type="invalid", action="reject"),
dict(connection_type="related", action="drop"),
],
group=dict(
address_group=[
dict(
afi="ipv4",
name="RND-HOSTS",
description="This group has the Management hosts address lists",
members=[
dict(address="192.0.2.1"),
dict(address="192.0.2.7"),
dict(address="192.0.2.9"),
],
),
dict(
afi="ipv6",
name="LOCAL-v6",
description="This group has the hosts address lists of this machine",
members=[
dict(address="::1"),
dict(address="fdec:2503:89d6:59b3::2"),
],
),
],
network_group=[
dict(
afi="ipv4",
name="RND",
description="This group has the Management network addresses",
members=[dict(address="192.0.2.0/24")],
),
dict(
afi="ipv6",
name="UNIQUE-LOCAL-v6",
description="This group encompasses the ULA address space in IPv6",
members=[dict(address="fc00::/7")],
),
],
port_group=[
dict(
name="SSH",
description="This group has the ssh ports",
members=[dict(port="2222")],
),
],
),
),
state="replaced",
),
)
commands = [
"delete firewall group address-group RND-HOSTS address 192.0.2.3",
"delete firewall group address-group RND-HOSTS address 192.0.2.5",
"delete firewall ipv6-src-route",
"delete firewall send-redirects",
"set firewall state-policy related action 'drop'",
"set firewall state-policy invalid action 'reject'",
"set firewall group address-group RND-HOSTS address 192.0.2.7",
"set firewall group address-group RND-HOSTS address 192.0.2.9",
"delete firewall group ipv6-address-group LOCAL-v6 address fdec:2503:89d6:59b3::1",
"set firewall group ipv6-address-group LOCAL-v6 address fdec:2503:89d6:59b3::2",
"delete firewall group port-group SSH port 22",
"set firewall group port-group SSH port 2222",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_global_set_01_replaced_idem(self):
set_module_args(
dict(
config=dict(
route_redirects=[
dict(ip_src_route=True, afi="ipv6"),
dict(icmp_redirects=dict(send=True), afi="ipv4"),
],
group=dict(
address_group=[
dict(
afi="ipv4",
name="RND-HOSTS",
description="This group has the Management hosts address lists",
members=[
dict(address="192.0.2.1"),
dict(address="192.0.2.3"),
dict(address="192.0.2.5"),
],
),
dict(
afi="ipv6",
name="LOCAL-v6",
description="This group has the hosts address lists of this machine",
members=[
dict(address="::1"),
dict(address="fdec:2503:89d6:59b3::1"),
],
),
],
network_group=[
dict(
afi="ipv4",
name="RND",
description="This group has the Management network addresses",
members=[dict(address="192.0.2.0/24")],
),
dict(
afi="ipv6",
name="UNIQUE-LOCAL-v6",
description="This group encompasses the ULA address space in IPv6",
members=[dict(address="fc00::/7")],
),
],
port_group=[
dict(
name="SSH",
description="This group has the ssh ports",
members=[dict(port="22")],
),
],
),
),
state="replaced",
),
)
self.execute_module(changed=False, commands=[])
def test_vyos_firewall_global_set_01_deleted(self):
set_module_args(dict(config=dict(), state="deleted"))
commands = ["delete firewall"]
self.execute_module(changed=True, commands=commands)
diff --git a/tests/unit/modules/network/vyos/test_vyos_firewall_global14.py b/tests/unit/modules/network/vyos/test_vyos_firewall_global14.py
index f4ae4add..aae4aa83 100644
--- a/tests/unit/modules/network/vyos/test_vyos_firewall_global14.py
+++ b/tests/unit/modules/network/vyos/test_vyos_firewall_global14.py
@@ -1,466 +1,467 @@
# (c) 2016 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see .
# Make coding more python3-ish
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from unittest.mock import patch
from ansible_collections.vyos.vyos.plugins.modules import vyos_firewall_global
from ansible_collections.vyos.vyos.tests.unit.modules.utils import set_module_args
from .vyos_module import TestVyosModule, load_fixture
class TestVyosFirewallRulesModule14(TestVyosModule):
module = vyos_firewall_global
def setUp(self):
super(TestVyosFirewallRulesModule14, self).setUp()
self.mock_get_config = patch(
"ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.get_config",
)
self.get_config = self.mock_get_config.start()
self.mock_load_config = patch(
"ansible_collections.ansible.netcommon.plugins.module_utils.network.common.network.Config.load_config",
)
self.load_config = self.mock_load_config.start()
self.mock_get_resource_connection_config = patch(
"ansible_collections.ansible.netcommon.plugins.module_utils.network.common.cfg.base.get_resource_connection",
)
self.get_resource_connection_config = self.mock_get_resource_connection_config.start()
self.mock_get_resource_connection_facts = patch(
"ansible_collections.ansible.netcommon.plugins.module_utils.network.common.facts.facts.get_resource_connection",
)
self.get_resource_connection_facts = self.mock_get_resource_connection_facts.start()
self.mock_execute_show_command = patch(
"ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.facts.firewall_global.firewall_global.Firewall_globalFacts.get_device_data",
)
self.mock_get_os_version = patch(
"ansible_collections.vyos.vyos.plugins.module_utils.network.vyos.config.firewall_global.firewall_global.get_os_version",
)
self.get_os_version = self.mock_get_os_version.start()
self.get_os_version.return_value = "1.4"
self.execute_show_command = self.mock_execute_show_command.start()
self.maxDiff = None
def tearDown(self):
super(TestVyosFirewallRulesModule14, self).tearDown()
self.mock_get_resource_connection_config.stop()
self.mock_get_resource_connection_facts.stop()
self.mock_get_config.stop()
self.mock_load_config.stop()
self.mock_execute_show_command.stop()
self.mock_get_os_version.stop()
def load_fixtures(self, commands=None, filename=None):
def load_from_file(*args, **kwargs):
return load_fixture("vyos_firewall_global_config_v14.cfg")
self.execute_show_command.side_effect = load_from_file
def test_vyos_firewall_global_set_01_merged(self):
set_module_args(
dict(
config=dict(
validation="strict",
config_trap=True,
log_martians=True,
syn_cookies=True,
twa_hazards_protection=True,
ping=dict(all=True, broadcast=True),
state_policy=[
dict(
connection_type="established",
action="accept",
log=True,
log_level="emerg",
),
dict(connection_type="invalid", action="reject"),
],
route_redirects=[
dict(
afi="ipv4",
ip_src_route=True,
icmp_redirects=dict(send=True, receive=False),
),
dict(
afi="ipv6",
ip_src_route=True,
icmp_redirects=dict(receive=False),
),
],
group=dict(
address_group=[
dict(
afi="ipv4",
name="MGMT-HOSTS",
description="This group has the Management hosts address lists",
members=[
dict(address="192.0.1.1"),
dict(address="192.0.1.3"),
dict(address="192.0.1.5"),
],
),
dict(
afi="ipv6",
name="GOOGLE-DNS-v6",
members=[
dict(address="2001:4860:4860::8888"),
dict(address="2001:4860:4860::8844"),
],
),
],
network_group=[
dict(
afi="ipv4",
name="MGMT",
description="This group has the Management network addresses",
members=[dict(address="192.0.1.0/24")],
),
dict(
afi="ipv6",
name="DOCUMENTATION-v6",
description="IPv6 Addresses reserved for documentation per RFC 3849",
members=[
dict(address="2001:0DB8::/32"),
dict(address="3FFF:FFFF::/32"),
],
),
],
port_group=[
dict(
name="TELNET",
description="This group has the telnet ports",
members=[dict(port="23")],
),
],
),
),
state="merged",
),
)
commands = [
"set firewall group address-group MGMT-HOSTS address 192.0.1.1",
"set firewall group address-group MGMT-HOSTS address 192.0.1.3",
"set firewall group address-group MGMT-HOSTS address 192.0.1.5",
"set firewall group address-group MGMT-HOSTS description 'This group has the Management hosts address lists'",
"set firewall group address-group MGMT-HOSTS",
"set firewall group ipv6-address-group GOOGLE-DNS-v6 address 2001:4860:4860::8888",
"set firewall group ipv6-address-group GOOGLE-DNS-v6 address 2001:4860:4860::8844",
"set firewall group ipv6-address-group GOOGLE-DNS-v6",
"set firewall group network-group MGMT network 192.0.1.0/24",
"set firewall group network-group MGMT description 'This group has the Management network addresses'",
"set firewall group network-group MGMT",
"set firewall group ipv6-network-group DOCUMENTATION-v6 network 2001:0DB8::/32",
"set firewall group ipv6-network-group DOCUMENTATION-v6 network 3FFF:FFFF::/32",
"set firewall group ipv6-network-group DOCUMENTATION-v6 description 'IPv6 Addresses reserved for documentation per RFC 3849'",
"set firewall group ipv6-network-group DOCUMENTATION-v6",
"set firewall group port-group TELNET port 23",
"set firewall group port-group TELNET description 'This group has the telnet ports'",
"set firewall group port-group TELNET",
"set firewall global-options ip-src-route 'enable'",
"set firewall global-options receive-redirects 'disable'",
"set firewall global-options config-trap 'enable'",
"set firewall global-options ipv6-receive-redirects 'disable'",
"set firewall global-options state-policy established action 'accept'",
"set firewall global-options state-policy established log",
"set firewall global-options state-policy established log-level 'emerg'",
"set firewall global-options state-policy invalid action 'reject'",
"set firewall global-options broadcast-ping 'enable'",
"set firewall global-options log-martians 'enable'",
"set firewall global-options twa-hazards-protection 'enable'",
"set firewall global-options syn-cookies 'enable'",
"set firewall global-options source-validation 'strict'",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_global_set_01_merged_idem(self):
set_module_args(
dict(
config=dict(
group=dict(
address_group=[
dict(
afi="ipv4",
name="RND-HOSTS",
description="This group has the Management hosts address lists",
members=[
dict(address="192.0.2.1"),
dict(address="192.0.2.3"),
dict(address="192.0.2.5"),
],
),
dict(
afi="ipv6",
name="LOCAL-v6",
description="This group has the hosts address lists of this machine",
members=[
dict(address="::1"),
dict(address="fdec:2503:89d6:59b3::1"),
],
),
],
network_group=[
dict(
afi="ipv4",
name="RND",
description="This group has the Management network addresses",
members=[dict(address="192.0.2.0/24")],
),
dict(
afi="ipv6",
name="UNIQUE-LOCAL-v6",
description="This group encompasses the ULA address space in IPv6",
members=[dict(address="fc00::/7")],
),
],
port_group=[
dict(
name="SSH",
description="This group has the ssh ports",
members=[dict(port="22")],
),
],
),
),
state="merged",
),
)
self.execute_module(changed=False, commands=[])
def test_vyos_firewall_global_set_01_replaced(self):
set_module_args(
dict(
config=dict(
state_policy=[
dict(connection_type="invalid", action="reject"),
],
group=dict(
address_group=[
dict(
afi="ipv4",
name="RND-HOSTS",
description="This group has the Management hosts address lists",
members=[
dict(address="192.0.2.1"),
dict(address="192.0.2.7"),
dict(address="192.0.2.9"),
],
),
dict(
afi="ipv6",
name="LOCAL-v6",
description="This group has the hosts address lists of this machine",
members=[
dict(address="::1"),
dict(address="fdec:2503:89d6:59b3::2"),
],
),
],
network_group=[
dict(
afi="ipv4",
name="RND",
- description="This group has the Management network addresses",
+ # Deleted the description here.
members=[dict(address="192.0.2.0/24")],
),
dict(
afi="ipv6",
name="UNIQUE-LOCAL-v6",
description="This group encompasses the ULA address space in IPv6",
members=[dict(address="fc00::/7")],
),
],
port_group=[
dict(
name="SSH",
description="This group has the ssh ports",
members=[dict(port="2222")],
),
],
),
),
state="replaced",
),
)
commands = [
"delete firewall group address-group RND-HOSTS address 192.0.2.3",
"delete firewall group address-group RND-HOSTS address 192.0.2.5",
"delete firewall global-options all-ping",
"delete firewall global-options state-policy related",
"delete firewall global-options ipv6-src-route",
"delete firewall global-options send-redirects",
"set firewall global-options state-policy invalid action 'reject'",
"set firewall group address-group RND-HOSTS address 192.0.2.7",
"set firewall group address-group RND-HOSTS address 192.0.2.9",
+ "delete firewall group network-group RND description",
"delete firewall group ipv6-address-group LOCAL-v6 address fdec:2503:89d6:59b3::1",
"set firewall group ipv6-address-group LOCAL-v6 address fdec:2503:89d6:59b3::2",
"delete firewall group port-group SSH port 22",
"set firewall group port-group SSH port 2222",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_global_set_01_replaced_idem(self):
set_module_args(
dict(
config=dict(
ping=dict(all=True),
route_redirects=[
dict(ip_src_route=True, afi="ipv6"),
dict(icmp_redirects=dict(send=True), afi="ipv4"),
],
state_policy=[
dict(connection_type="related", action="accept", log_level="alert"),
],
group=dict(
address_group=[
dict(
afi="ipv4",
name="RND-HOSTS",
description="This group has the Management hosts address lists",
members=[
dict(address="192.0.2.1"),
dict(address="192.0.2.3"),
dict(address="192.0.2.5"),
],
),
dict(
afi="ipv6",
name="LOCAL-v6",
description="This group has the hosts address lists of this machine",
members=[
dict(address="::1"),
dict(address="fdec:2503:89d6:59b3::1"),
],
),
],
network_group=[
dict(
afi="ipv4",
name="RND",
description="This group has the Management network addresses",
members=[dict(address="192.0.2.0/24")],
),
dict(
afi="ipv6",
name="UNIQUE-LOCAL-v6",
description="This group encompasses the ULA address space in IPv6",
members=[dict(address="fc00::/7")],
),
],
port_group=[
dict(
name="SSH",
description="This group has the ssh ports",
members=[dict(port="22")],
),
],
),
),
state="replaced",
),
)
self.execute_module(changed=False, commands=[])
def test_vyos_firewall_global_set_02_replaced(self):
set_module_args(
dict(
config=dict(
state_policy=[
dict(connection_type="invalid", action="reject"),
dict(connection_type="related", action="drop"),
],
group=dict(
address_group=[
dict(
afi="ipv4",
name="RND-HOSTS",
description="This group has the Management hosts address lists",
members=[
dict(address="192.0.2.1"),
dict(address="192.0.2.7"),
dict(address="192.0.2.9"),
],
),
dict(
afi="ipv6",
name="LOCAL-v6",
description="This group has the hosts address lists of this machine",
members=[
dict(address="::1"),
dict(address="fdec:2503:89d6:59b3::2"),
],
),
],
network_group=[
dict(
afi="ipv4",
name="RND",
description="This group has the Management network addresses",
members=[dict(address="192.0.2.0/24")],
),
dict(
afi="ipv6",
name="UNIQUE-LOCAL-v6",
description="This group encompasses the ULA address space in IPv6",
members=[dict(address="fc00::/7")],
),
],
port_group=[
dict(
name="SSH",
description="This group has the ssh ports",
members=[dict(port="2222")],
),
],
),
),
state="replaced",
),
)
commands = [
"delete firewall group address-group RND-HOSTS address 192.0.2.3",
"delete firewall group address-group RND-HOSTS address 192.0.2.5",
"delete firewall global-options all-ping",
"delete firewall global-options ipv6-src-route",
"delete firewall global-options send-redirects",
"set firewall global-options state-policy related action 'drop'",
"delete firewall global-options state-policy related log-level",
"set firewall global-options state-policy invalid action 'reject'",
"set firewall group address-group RND-HOSTS address 192.0.2.7",
"set firewall group address-group RND-HOSTS address 192.0.2.9",
"delete firewall group ipv6-address-group LOCAL-v6 address fdec:2503:89d6:59b3::1",
"set firewall group ipv6-address-group LOCAL-v6 address fdec:2503:89d6:59b3::2",
"delete firewall group port-group SSH port 22",
"set firewall group port-group SSH port 2222",
]
self.execute_module(changed=True, commands=commands)
def test_vyos_firewall_global_set_01_deleted(self):
set_module_args(dict(config=dict(), state="deleted"))
commands = ["delete firewall"]
self.execute_module(changed=True, commands=commands)