Page Menu
Home
VyOS Platform
Search
Configure Global Search
Log In
Files
F35450527
zone_policy.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
8 KB
Referenced Files
None
Subscribers
None
zone_policy.py
View Options
#!/usr/bin/env python3
#
# Copyright (C) 2021-2022 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
os
from
json
import
loads
from
sys
import
exit
from
vyos.config
import
Config
from
vyos.configdict
import
dict_merge
from
vyos.template
import
render
from
vyos.util
import
cmd
from
vyos.util
import
dict_search_args
from
vyos.util
import
run
from
vyos.xml
import
defaults
from
vyos
import
ConfigError
from
vyos
import
airbag
airbag
.
enable
()
nftables_conf
=
'/run/nftables_zone.conf'
def
get_config
(
config
=
None
):
if
config
:
conf
=
config
else
:
conf
=
Config
()
base
=
[
'zone-policy'
]
zone_policy
=
conf
.
get_config_dict
(
base
,
key_mangling
=
(
'-'
,
'_'
),
get_first_key
=
True
,
no_tag_node_value_mangle
=
True
)
zone_policy
[
'firewall'
]
=
conf
.
get_config_dict
([
'firewall'
],
key_mangling
=
(
'-'
,
'_'
),
get_first_key
=
True
,
no_tag_node_value_mangle
=
True
)
if
'zone'
in
zone_policy
:
# We have gathered the dict representation of the CLI, but there are default
# options which we need to update into the dictionary retrived.
default_values
=
defaults
(
base
+
[
'zone'
])
for
zone
in
zone_policy
[
'zone'
]:
zone_policy
[
'zone'
][
zone
]
=
dict_merge
(
default_values
,
zone_policy
[
'zone'
][
zone
])
return
zone_policy
def
verify
(
zone_policy
):
# bail out early - looks like removal from running config
if
not
zone_policy
:
return
None
local_zone
=
False
interfaces
=
[]
if
'zone'
in
zone_policy
:
for
zone
,
zone_conf
in
zone_policy
[
'zone'
]
.
items
():
if
'local_zone'
not
in
zone_conf
and
'interface'
not
in
zone_conf
:
raise
ConfigError
(
f
'Zone "{zone}" has no interfaces and is not the local zone'
)
if
'local_zone'
in
zone_conf
:
if
local_zone
:
raise
ConfigError
(
'There cannot be multiple local zones'
)
if
'interface'
in
zone_conf
:
raise
ConfigError
(
'Local zone cannot have interfaces assigned'
)
if
'intra_zone_filtering'
in
zone_conf
:
raise
ConfigError
(
'Local zone cannot use intra-zone-filtering'
)
local_zone
=
True
if
'interface'
in
zone_conf
:
found_duplicates
=
[
intf
for
intf
in
zone_conf
[
'interface'
]
if
intf
in
interfaces
]
if
found_duplicates
:
raise
ConfigError
(
f
'Interfaces cannot be assigned to multiple zones'
)
interfaces
+=
zone_conf
[
'interface'
]
if
'intra_zone_filtering'
in
zone_conf
:
intra_zone
=
zone_conf
[
'intra_zone_filtering'
]
if
len
(
intra_zone
)
>
1
:
raise
ConfigError
(
'Only one intra-zone-filtering action must be specified'
)
if
'firewall'
in
intra_zone
:
v4_name
=
dict_search_args
(
intra_zone
,
'firewall'
,
'name'
)
if
v4_name
and
not
dict_search_args
(
zone_policy
,
'firewall'
,
'name'
,
v4_name
):
raise
ConfigError
(
f
'Firewall name "{v4_name}" does not exist'
)
v6_name
=
dict_search_args
(
intra_zone
,
'firewall'
,
'ipv6-name'
)
if
v6_name
and
not
dict_search_args
(
zone_policy
,
'firewall'
,
'ipv6-name'
,
v6_name
):
raise
ConfigError
(
f
'Firewall ipv6-name "{v6_name}" does not exist'
)
if
not
v4_name
and
not
v6_name
:
raise
ConfigError
(
'No firewall names specified for intra-zone-filtering'
)
if
'from'
in
zone_conf
:
for
from_zone
,
from_conf
in
zone_conf
[
'from'
]
.
items
():
if
from_zone
not
in
zone_policy
[
'zone'
]:
raise
ConfigError
(
f
'Zone "{zone}" refers to a non-existent or deleted zone "{from_zone}"'
)
v4_name
=
dict_search_args
(
from_conf
,
'firewall'
,
'name'
)
if
v4_name
:
if
'name'
not
in
zone_policy
[
'firewall'
]:
raise
ConfigError
(
f
'Firewall name "{v4_name}" does not exist'
)
if
not
dict_search_args
(
zone_policy
,
'firewall'
,
'name'
,
v4_name
):
raise
ConfigError
(
f
'Firewall name "{v4_name}" does not exist'
)
v6_name
=
dict_search_args
(
from_conf
,
'firewall'
,
'v6_name'
)
if
v6_name
:
if
'ipv6_name'
not
in
zone_policy
[
'firewall'
]:
raise
ConfigError
(
f
'Firewall ipv6-name "{v6_name}" does not exist'
)
if
not
dict_search_args
(
zone_policy
,
'firewall'
,
'ipv6_name'
,
v6_name
):
raise
ConfigError
(
f
'Firewall ipv6-name "{v6_name}" does not exist'
)
return
None
def
has_ipv4_fw
(
zone_conf
):
if
'from'
not
in
zone_conf
:
return
False
zone_from
=
zone_conf
[
'from'
]
return
any
([
True
for
fz
in
zone_from
if
dict_search_args
(
zone_from
,
fz
,
'firewall'
,
'name'
)])
def
has_ipv6_fw
(
zone_conf
):
if
'from'
not
in
zone_conf
:
return
False
zone_from
=
zone_conf
[
'from'
]
return
any
([
True
for
fz
in
zone_from
if
dict_search_args
(
zone_from
,
fz
,
'firewall'
,
'ipv6_name'
)])
def
get_local_from
(
zone_policy
,
local_zone_name
):
# Get all zone firewall names from the local zone
out
=
{}
for
zone
,
zone_conf
in
zone_policy
[
'zone'
]
.
items
():
if
zone
==
local_zone_name
:
continue
if
'from'
not
in
zone_conf
:
continue
if
local_zone_name
in
zone_conf
[
'from'
]:
out
[
zone
]
=
zone_conf
[
'from'
][
local_zone_name
]
return
out
def
cleanup_commands
():
commands
=
[]
for
table
in
[
'ip filter'
,
'ip6 filter'
]:
json_str
=
cmd
(
f
'nft -j list table {table}'
)
obj
=
loads
(
json_str
)
if
'nftables'
not
in
obj
:
continue
for
item
in
obj
[
'nftables'
]:
if
'rule'
in
item
:
chain
=
item
[
'rule'
][
'chain'
]
handle
=
item
[
'rule'
][
'handle'
]
if
'expr'
not
in
item
[
'rule'
]:
continue
for
expr
in
item
[
'rule'
][
'expr'
]:
target
=
dict_search_args
(
expr
,
'jump'
,
'target'
)
if
not
target
:
continue
if
target
.
startswith
(
"VZONE"
)
or
target
.
startswith
(
"VYOS_STATE_POLICY"
):
commands
.
append
(
f
'delete rule {table} {chain} handle {handle}'
)
for
item
in
obj
[
'nftables'
]:
if
'chain'
in
item
:
if
item
[
'chain'
][
'name'
]
.
startswith
(
"VZONE"
):
chain
=
item
[
'chain'
][
'name'
]
commands
.
append
(
f
'delete chain {table} {chain}'
)
return
commands
def
generate
(
zone_policy
):
data
=
zone_policy
or
{}
if
os
.
path
.
exists
(
nftables_conf
):
# Check to see if we've run before
data
[
'cleanup_commands'
]
=
cleanup_commands
()
if
'zone'
in
data
:
for
zone
,
zone_conf
in
data
[
'zone'
]
.
items
():
zone_conf
[
'ipv4'
]
=
has_ipv4_fw
(
zone_conf
)
zone_conf
[
'ipv6'
]
=
has_ipv6_fw
(
zone_conf
)
if
'local_zone'
in
zone_conf
:
zone_conf
[
'from_local'
]
=
get_local_from
(
data
,
zone
)
render
(
nftables_conf
,
'zone_policy/nftables.tmpl'
,
data
)
return
None
def
apply
(
zone_policy
):
install_result
=
run
(
f
'nft -f {nftables_conf}'
)
if
install_result
!=
0
:
raise
ConfigError
(
'Failed to apply zone-policy'
)
return
None
if
__name__
==
'__main__'
:
try
:
c
=
get_config
()
verify
(
c
)
generate
(
c
)
apply
(
c
)
except
ConfigError
as
e
:
print
(
e
)
exit
(
1
)
File Metadata
Details
Attached
Mime Type
text/x-script.python
Expires
Tue, Dec 9, 10:51 PM (1 d, 7 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3071080
Default Alt Text
zone_policy.py (8 KB)
Attached To
Mode
rVYOSONEX vyos-1x
Attached
Detach File
Event Timeline
Log In to Comment