Page Menu
Home
VyOS Platform
Search
Configure Global Search
Log In
Files
F38643658
base_interfaces_test.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
36 KB
Referenced Files
None
Subscribers
None
base_interfaces_test.py
View Options
# Copyright (C) 2019-2022 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
os
import
unittest
from
binascii
import
hexlify
from
netifaces
import
AF_INET
from
netifaces
import
AF_INET6
from
netifaces
import
ifaddresses
from
netifaces
import
interfaces
from
base_vyostest_shim
import
VyOSUnitTestSHIM
from
vyos.configsession
import
ConfigSession
from
vyos.configsession
import
ConfigSessionError
from
vyos.ifconfig
import
Interface
from
vyos.ifconfig
import
Section
from
vyos.util
import
read_file
from
vyos.util
import
cmd
from
vyos.util
import
dict_search
from
vyos.util
import
process_named_running
from
vyos.util
import
get_interface_config
from
vyos.validate
import
is_intf_addr_assigned
from
vyos.validate
import
is_ipv6_link_local
def
is_mirrored_to
(
interface
,
mirror_if
,
qdisc
):
"""
Ask TC if we are mirroring traffic to a discrete interface.
interface: source interface
mirror_if: destination where we mirror our data to
qdisc: must be ffff or 1 for ingress/egress
"""
if
qdisc
not
in
[
'ffff'
,
'1'
]:
raise
ValueError
()
ret_val
=
False
tmp
=
cmd
(
f
'tc -s -p filter ls dev {interface} parent {qdisc}: | grep mirred'
)
tmp
=
tmp
.
lower
()
if
mirror_if
in
tmp
:
ret_val
=
True
return
ret_val
class
BasicInterfaceTest
:
class
TestCase
(
VyOSUnitTestSHIM
.
TestCase
):
_test_dhcp
=
False
_test_ip
=
False
_test_mtu
=
False
_test_vlan
=
False
_test_qinq
=
False
_test_ipv6
=
False
_test_ipv6_pd
=
False
_test_ipv6_dhcpc6
=
False
_test_mirror
=
False
_base_path
=
[]
_options
=
{}
_interfaces
=
[]
_qinq_range
=
[
'10'
,
'20'
,
'30'
]
_vlan_range
=
[
'100'
,
'200'
,
'300'
,
'2000'
]
_test_addr
=
[
'192.0.2.1/26'
,
'192.0.2.255/31'
,
'192.0.2.64/32'
,
'2001:db8:1::ffff/64'
,
'2001:db8:101::1/112'
]
_mirror_interfaces
=
[]
# choose IPv6 minimum MTU value for tests - this must always work
_mtu
=
'1280'
@classmethod
def
setUpClass
(
cls
):
super
(
BasicInterfaceTest
.
TestCase
,
cls
)
.
setUpClass
()
# Setup mirror interfaces for SPAN (Switch Port Analyzer)
for
span
in
cls
.
_mirror_interfaces
:
section
=
Section
.
section
(
span
)
cls
.
cli_set
(
cls
,
[
'interfaces'
,
section
,
span
])
@classmethod
def
tearDownClass
(
cls
):
# Tear down mirror interfaces for SPAN (Switch Port Analyzer)
for
span
in
cls
.
_mirror_interfaces
:
section
=
Section
.
section
(
span
)
cls
.
cli_delete
(
cls
,
[
'interfaces'
,
section
,
span
])
super
(
BasicInterfaceTest
.
TestCase
,
cls
)
.
tearDownClass
()
def
tearDown
(
self
):
self
.
cli_delete
(
self
.
_base_path
)
self
.
cli_commit
()
# Verify that no previously interface remained on the system
for
intf
in
self
.
_interfaces
:
self
.
assertNotIn
(
intf
,
interfaces
())
# No daemon that was started during a test should remain running
for
daemon
in
[
'dhcp6c'
,
'dhclient'
]:
self
.
assertFalse
(
process_named_running
(
daemon
))
def
test_dhcp_disable_interface
(
self
):
if
not
self
.
_test_dhcp
:
self
.
skipTest
(
'not supported'
)
# When interface is configured as admin down, it must be admin down
# even when dhcpc starts on the given interface
for
interface
in
self
.
_interfaces
:
self
.
cli_set
(
self
.
_base_path
+
[
interface
,
'disable'
])
for
option
in
self
.
_options
.
get
(
interface
,
[]):
self
.
cli_set
(
self
.
_base_path
+
[
interface
]
+
option
.
split
())
self
.
cli_set
(
self
.
_base_path
+
[
interface
,
'disable'
])
# Also enable DHCP (ISC DHCP always places interface in admin up
# state so we check that we do not start DHCP client.
# https://phabricator.vyos.net/T2767
self
.
cli_set
(
self
.
_base_path
+
[
interface
,
'address'
,
'dhcp'
])
self
.
cli_commit
()
# Validate interface state
for
interface
in
self
.
_interfaces
:
flags
=
read_file
(
f
'/sys/class/net/{interface}/flags'
)
self
.
assertEqual
(
int
(
flags
,
16
)
&
1
,
0
)
def
test_span_mirror
(
self
):
if
not
self
.
_mirror_interfaces
:
self
.
skipTest
(
'not supported'
)
# Check the two-way mirror rules of ingress and egress
for
mirror
in
self
.
_mirror_interfaces
:
for
interface
in
self
.
_interfaces
:
self
.
cli_set
(
self
.
_base_path
+
[
interface
,
'mirror'
,
'ingress'
,
mirror
])
self
.
cli_set
(
self
.
_base_path
+
[
interface
,
'mirror'
,
'egress'
,
mirror
])
self
.
cli_commit
()
# Verify config
for
mirror
in
self
.
_mirror_interfaces
:
for
interface
in
self
.
_interfaces
:
self
.
assertTrue
(
is_mirrored_to
(
interface
,
mirror
,
'ffff'
))
self
.
assertTrue
(
is_mirrored_to
(
interface
,
mirror
,
'1'
))
def
test_interface_disable
(
self
):
# Check if description can be added to interface and
# can be read back
for
intf
in
self
.
_interfaces
:
self
.
cli_set
(
self
.
_base_path
+
[
intf
,
'disable'
])
for
option
in
self
.
_options
.
get
(
intf
,
[]):
self
.
cli_set
(
self
.
_base_path
+
[
intf
]
+
option
.
split
())
self
.
cli_commit
()
# Validate interface description
for
intf
in
self
.
_interfaces
:
self
.
assertEqual
(
Interface
(
intf
)
.
get_admin_state
(),
'down'
)
def
test_interface_description
(
self
):
# Check if description can be added to interface and
# can be read back
for
intf
in
self
.
_interfaces
:
test_string
=
f
'Description-Test-{intf}'
self
.
cli_set
(
self
.
_base_path
+
[
intf
,
'description'
,
test_string
])
for
option
in
self
.
_options
.
get
(
intf
,
[]):
self
.
cli_set
(
self
.
_base_path
+
[
intf
]
+
option
.
split
())
self
.
cli_commit
()
# Validate interface description
for
intf
in
self
.
_interfaces
:
test_string
=
f
'Description-Test-{intf}'
tmp
=
read_file
(
f
'/sys/class/net/{intf}/ifalias'
)
self
.
assertEqual
(
tmp
,
test_string
)
self
.
assertEqual
(
Interface
(
intf
)
.
get_alias
(),
test_string
)
self
.
cli_delete
(
self
.
_base_path
+
[
intf
,
'description'
])
self
.
cli_commit
()
# Validate remove interface description "empty"
for
intf
in
self
.
_interfaces
:
tmp
=
read_file
(
f
'/sys/class/net/{intf}/ifalias'
)
self
.
assertEqual
(
tmp
,
str
())
self
.
assertEqual
(
Interface
(
intf
)
.
get_alias
(),
str
())
def
test_add_single_ip_address
(
self
):
addr
=
'192.0.2.0/31'
for
intf
in
self
.
_interfaces
:
self
.
cli_set
(
self
.
_base_path
+
[
intf
,
'address'
,
addr
])
for
option
in
self
.
_options
.
get
(
intf
,
[]):
self
.
cli_set
(
self
.
_base_path
+
[
intf
]
+
option
.
split
())
self
.
cli_commit
()
for
intf
in
self
.
_interfaces
:
self
.
assertTrue
(
is_intf_addr_assigned
(
intf
,
addr
))
self
.
assertEqual
(
Interface
(
intf
)
.
get_admin_state
(),
'up'
)
def
test_add_multiple_ip_addresses
(
self
):
# Add address
for
intf
in
self
.
_interfaces
:
for
option
in
self
.
_options
.
get
(
intf
,
[]):
self
.
cli_set
(
self
.
_base_path
+
[
intf
]
+
option
.
split
())
for
addr
in
self
.
_test_addr
:
self
.
cli_set
(
self
.
_base_path
+
[
intf
,
'address'
,
addr
])
self
.
cli_commit
()
# Validate address
for
intf
in
self
.
_interfaces
:
for
af
in
AF_INET
,
AF_INET6
:
for
addr
in
ifaddresses
(
intf
)[
af
]:
# checking link local addresses makes no sense
if
is_ipv6_link_local
(
addr
[
'addr'
]):
continue
self
.
assertTrue
(
is_intf_addr_assigned
(
intf
,
addr
[
'addr'
]))
def
test_ipv6_link_local_address
(
self
):
# Common function for IPv6 link-local address assignemnts
if
not
self
.
_test_ipv6
:
self
.
skipTest
(
'not supported'
)
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
# just set the interface base without any option - some interfaces
# (VTI) do not require any option to be brought up
self
.
cli_set
(
base
)
for
option
in
self
.
_options
.
get
(
interface
,
[]):
self
.
cli_set
(
base
+
option
.
split
())
# after commit we must have an IPv6 link-local address
self
.
cli_commit
()
for
interface
in
self
.
_interfaces
:
self
.
assertIn
(
AF_INET6
,
ifaddresses
(
interface
))
for
addr
in
ifaddresses
(
interface
)[
AF_INET6
]:
self
.
assertTrue
(
is_ipv6_link_local
(
addr
[
'addr'
]))
# disable IPv6 link-local address assignment
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
self
.
cli_set
(
base
+
[
'ipv6'
,
'address'
,
'no-default-link-local'
])
# after commit we must have no IPv6 link-local address
self
.
cli_commit
()
for
interface
in
self
.
_interfaces
:
self
.
assertNotIn
(
AF_INET6
,
ifaddresses
(
interface
))
def
test_interface_mtu
(
self
):
if
not
self
.
_test_mtu
:
self
.
skipTest
(
'not supported'
)
for
intf
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
intf
]
self
.
cli_set
(
base
+
[
'mtu'
,
self
.
_mtu
])
for
option
in
self
.
_options
.
get
(
intf
,
[]):
self
.
cli_set
(
base
+
option
.
split
())
# commit interface changes
self
.
cli_commit
()
# verify changed MTU
for
intf
in
self
.
_interfaces
:
tmp
=
get_interface_config
(
intf
)
self
.
assertEqual
(
tmp
[
'mtu'
],
int
(
self
.
_mtu
))
def
test_mtu_1200_no_ipv6_interface
(
self
):
# Testcase if MTU can be changed to 1200 on non IPv6
# enabled interfaces
if
not
self
.
_test_mtu
:
self
.
skipTest
(
'not supported'
)
old_mtu
=
self
.
_mtu
self
.
_mtu
=
'1200'
for
intf
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
intf
]
self
.
cli_set
(
base
+
[
'mtu'
,
self
.
_mtu
])
for
option
in
self
.
_options
.
get
(
intf
,
[]):
self
.
cli_set
(
base
+
option
.
split
())
# check validate() - can not set low MTU if 'no-default-link-local'
# is not set on CLI
with
self
.
assertRaises
(
ConfigSessionError
):
self
.
cli_commit
()
for
intf
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
intf
]
self
.
cli_set
(
base
+
[
'ipv6'
,
'address'
,
'no-default-link-local'
])
# commit interface changes
self
.
cli_commit
()
# verify changed MTU
for
intf
in
self
.
_interfaces
:
tmp
=
get_interface_config
(
intf
)
self
.
assertEqual
(
tmp
[
'mtu'
],
int
(
self
.
_mtu
))
self
.
_mtu
=
old_mtu
def
test_vif_8021q_interfaces
(
self
):
# XXX: This testcase is not allowed to run as first testcase, reason
# is the Wireless test will first load the wifi kernel hwsim module
# which creates a wlan0 and wlan1 interface which will fail the
# tearDown() test in the end that no interface is allowed to survive!
if
not
self
.
_test_vlan
:
self
.
skipTest
(
'not supported'
)
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
for
option
in
self
.
_options
.
get
(
interface
,
[]):
self
.
cli_set
(
base
+
option
.
split
())
for
vlan
in
self
.
_vlan_range
:
base
=
self
.
_base_path
+
[
interface
,
'vif'
,
vlan
]
for
address
in
self
.
_test_addr
:
self
.
cli_set
(
base
+
[
'address'
,
address
])
self
.
cli_commit
()
for
intf
in
self
.
_interfaces
:
for
vlan
in
self
.
_vlan_range
:
vif
=
f
'{intf}.{vlan}'
for
address
in
self
.
_test_addr
:
self
.
assertTrue
(
is_intf_addr_assigned
(
vif
,
address
))
self
.
assertEqual
(
Interface
(
vif
)
.
get_admin_state
(),
'up'
)
# T4064: Delete interface addresses, keep VLAN interface
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
for
vlan
in
self
.
_vlan_range
:
base
=
self
.
_base_path
+
[
interface
,
'vif'
,
vlan
]
self
.
cli_delete
(
base
+
[
'address'
])
self
.
cli_commit
()
# Verify no IP address is assigned
for
interface
in
self
.
_interfaces
:
for
vlan
in
self
.
_vlan_range
:
vif
=
f
'{intf}.{vlan}'
for
address
in
self
.
_test_addr
:
self
.
assertFalse
(
is_intf_addr_assigned
(
vif
,
address
))
def
test_vif_8021q_mtu_limits
(
self
):
# XXX: This testcase is not allowed to run as first testcase, reason
# is the Wireless test will first load the wifi kernel hwsim module
# which creates a wlan0 and wlan1 interface which will fail the
# tearDown() test in the end that no interface is allowed to survive!
if
not
self
.
_test_vlan
:
self
.
skipTest
(
'not supported'
)
mtu_1500
=
'1500'
mtu_9000
=
'9000'
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
self
.
cli_set
(
base
+
[
'mtu'
,
mtu_1500
])
for
option
in
self
.
_options
.
get
(
interface
,
[]):
self
.
cli_set
(
base
+
option
.
split
())
if
'source-interface'
in
option
:
iface
=
option
.
split
()[
-
1
]
iface_type
=
Section
.
section
(
iface
)
self
.
cli_set
([
'interfaces'
,
iface_type
,
iface
,
'mtu'
,
mtu_9000
])
for
vlan
in
self
.
_vlan_range
:
base
=
self
.
_base_path
+
[
interface
,
'vif'
,
vlan
]
self
.
cli_set
(
base
+
[
'mtu'
,
mtu_9000
])
# check validate() - VIF MTU must not be larger the parent interface
# MTU size.
with
self
.
assertRaises
(
ConfigSessionError
):
self
.
cli_commit
()
# Change MTU on base interface to be the same as on the VIF interface
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
self
.
cli_set
(
base
+
[
'mtu'
,
mtu_9000
])
self
.
cli_commit
()
# Verify MTU on base and VIF interfaces
for
interface
in
self
.
_interfaces
:
tmp
=
get_interface_config
(
interface
)
self
.
assertEqual
(
tmp
[
'mtu'
],
int
(
mtu_9000
))
for
vlan
in
self
.
_vlan_range
:
tmp
=
get_interface_config
(
f
'{interface}.{vlan}'
)
self
.
assertEqual
(
tmp
[
'mtu'
],
int
(
mtu_9000
))
def
test_vif_8021q_qos_change
(
self
):
# XXX: This testcase is not allowed to run as first testcase, reason
# is the Wireless test will first load the wifi kernel hwsim module
# which creates a wlan0 and wlan1 interface which will fail the
# tearDown() test in the end that no interface is allowed to survive!
if
not
self
.
_test_vlan
:
self
.
skipTest
(
'not supported'
)
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
for
option
in
self
.
_options
.
get
(
interface
,
[]):
self
.
cli_set
(
base
+
option
.
split
())
for
vlan
in
self
.
_vlan_range
:
base
=
self
.
_base_path
+
[
interface
,
'vif'
,
vlan
]
self
.
cli_set
(
base
+
[
'ingress-qos'
,
'0:1'
])
self
.
cli_set
(
base
+
[
'egress-qos'
,
'1:6'
])
self
.
cli_commit
()
for
intf
in
self
.
_interfaces
:
for
vlan
in
self
.
_vlan_range
:
vif
=
f
'{intf}.{vlan}'
tmp
=
get_interface_config
(
f
'{vif}'
)
tmp2
=
dict_search
(
'linkinfo.info_data.ingress_qos'
,
tmp
)
for
item
in
tmp2
if
tmp2
else
[]:
from_key
=
item
[
'from'
]
to_key
=
item
[
'to'
]
self
.
assertEqual
(
from_key
,
0
)
self
.
assertEqual
(
to_key
,
1
)
tmp2
=
dict_search
(
'linkinfo.info_data.egress_qos'
,
tmp
)
for
item
in
tmp2
if
tmp2
else
[]:
from_key
=
item
[
'from'
]
to_key
=
item
[
'to'
]
self
.
assertEqual
(
from_key
,
1
)
self
.
assertEqual
(
to_key
,
6
)
self
.
assertEqual
(
Interface
(
vif
)
.
get_admin_state
(),
'up'
)
new_ingress_qos_from
=
1
new_ingress_qos_to
=
6
new_egress_qos_from
=
2
new_egress_qos_to
=
7
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
for
vlan
in
self
.
_vlan_range
:
base
=
self
.
_base_path
+
[
interface
,
'vif'
,
vlan
]
self
.
cli_set
(
base
+
[
'ingress-qos'
,
f
'{new_ingress_qos_from}:{new_ingress_qos_to}'
])
self
.
cli_set
(
base
+
[
'egress-qos'
,
f
'{new_egress_qos_from}:{new_egress_qos_to}'
])
self
.
cli_commit
()
for
intf
in
self
.
_interfaces
:
for
vlan
in
self
.
_vlan_range
:
vif
=
f
'{intf}.{vlan}'
tmp
=
get_interface_config
(
f
'{vif}'
)
tmp2
=
dict_search
(
'linkinfo.info_data.ingress_qos'
,
tmp
)
if
tmp2
:
from_key
=
tmp2
[
0
][
'from'
]
to_key
=
tmp2
[
0
][
'to'
]
self
.
assertEqual
(
from_key
,
new_ingress_qos_from
)
self
.
assertEqual
(
to_key
,
new_ingress_qos_to
)
tmp2
=
dict_search
(
'linkinfo.info_data.egress_qos'
,
tmp
)
if
tmp2
:
from_key
=
tmp2
[
0
][
'from'
]
to_key
=
tmp2
[
0
][
'to'
]
self
.
assertEqual
(
from_key
,
new_egress_qos_from
)
self
.
assertEqual
(
to_key
,
new_egress_qos_to
)
def
test_vif_8021q_lower_up_down
(
self
):
# Testcase for https://phabricator.vyos.net/T3349
if
not
self
.
_test_vlan
:
self
.
skipTest
(
'not supported'
)
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
for
option
in
self
.
_options
.
get
(
interface
,
[]):
self
.
cli_set
(
base
+
option
.
split
())
# disable the lower interface
self
.
cli_set
(
base
+
[
'disable'
])
for
vlan
in
self
.
_vlan_range
:
vlan_base
=
self
.
_base_path
+
[
interface
,
'vif'
,
vlan
]
# disable the vlan interface
self
.
cli_set
(
vlan_base
+
[
'disable'
])
self
.
cli_commit
()
# re-enable all lower interfaces
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
self
.
cli_delete
(
base
+
[
'disable'
])
self
.
cli_commit
()
# verify that the lower interfaces are admin up and the vlan
# interfaces are all admin down
for
interface
in
self
.
_interfaces
:
self
.
assertEqual
(
Interface
(
interface
)
.
get_admin_state
(),
'up'
)
for
vlan
in
self
.
_vlan_range
:
ifname
=
f
'{interface}.{vlan}'
self
.
assertEqual
(
Interface
(
ifname
)
.
get_admin_state
(),
'down'
)
def
test_vif_s_8021ad_vlan_interfaces
(
self
):
# XXX: This testcase is not allowed to run as first testcase, reason
# is the Wireless test will first load the wifi kernel hwsim module
# which creates a wlan0 and wlan1 interface which will fail the
# tearDown() test in the end that no interface is allowed to survive!
if
not
self
.
_test_qinq
:
self
.
skipTest
(
'not supported'
)
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
for
option
in
self
.
_options
.
get
(
interface
,
[]):
self
.
cli_set
(
base
+
option
.
split
())
for
vif_s
in
self
.
_qinq_range
:
for
vif_c
in
self
.
_vlan_range
:
base
=
self
.
_base_path
+
[
interface
,
'vif-s'
,
vif_s
,
'vif-c'
,
vif_c
]
self
.
cli_set
(
base
+
[
'mtu'
,
self
.
_mtu
])
for
address
in
self
.
_test_addr
:
self
.
cli_set
(
base
+
[
'address'
,
address
])
self
.
cli_commit
()
for
interface
in
self
.
_interfaces
:
for
vif_s
in
self
.
_qinq_range
:
tmp
=
get_interface_config
(
f
'{interface}.{vif_s}'
)
self
.
assertEqual
(
dict_search
(
'linkinfo.info_data.protocol'
,
tmp
),
'802.1ad'
)
for
vif_c
in
self
.
_vlan_range
:
vif
=
f
'{interface}.{vif_s}.{vif_c}'
# For an unknown reason this regularely fails on the QEMU builds,
# thus the test for reading back IP addresses is temporary
# disabled. There is no big deal here, as this uses the same
# methods on 802.1q and here it works and is verified.
# for address in self._test_addr:
# self.assertTrue(is_intf_addr_assigned(vif, address))
tmp
=
get_interface_config
(
vif
)
self
.
assertEqual
(
tmp
[
'mtu'
],
int
(
self
.
_mtu
))
# T4064: Delete interface addresses, keep VLAN interface
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
for
vif_s
in
self
.
_qinq_range
:
for
vif_c
in
self
.
_vlan_range
:
self
.
cli_delete
(
self
.
_base_path
+
[
interface
,
'vif-s'
,
vif_s
,
'vif-c'
,
vif_c
,
'address'
])
self
.
cli_commit
()
# Verify no IP address is assigned
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
for
vif_s
in
self
.
_qinq_range
:
for
vif_c
in
self
.
_vlan_range
:
vif
=
f
'{interface}.{vif_s}.{vif_c}'
for
address
in
self
.
_test_addr
:
self
.
assertFalse
(
is_intf_addr_assigned
(
vif
,
address
))
# T3972: remove vif-c interfaces from vif-s
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
for
vif_s
in
self
.
_qinq_range
:
base
=
self
.
_base_path
+
[
interface
,
'vif-s'
,
vif_s
,
'vif-c'
]
self
.
cli_delete
(
base
)
self
.
cli_commit
()
def
test_vif_s_protocol_change
(
self
):
# XXX: This testcase is not allowed to run as first testcase, reason
# is the Wireless test will first load the wifi kernel hwsim module
# which creates a wlan0 and wlan1 interface which will fail the
# tearDown() test in the end that no interface is allowed to survive!
if
not
self
.
_test_qinq
:
self
.
skipTest
(
'not supported'
)
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
for
option
in
self
.
_options
.
get
(
interface
,
[]):
self
.
cli_set
(
base
+
option
.
split
())
for
vif_s
in
self
.
_qinq_range
:
for
vif_c
in
self
.
_vlan_range
:
base
=
self
.
_base_path
+
[
interface
,
'vif-s'
,
vif_s
,
'vif-c'
,
vif_c
]
for
address
in
self
.
_test_addr
:
self
.
cli_set
(
base
+
[
'address'
,
address
])
self
.
cli_commit
()
for
interface
in
self
.
_interfaces
:
for
vif_s
in
self
.
_qinq_range
:
tmp
=
get_interface_config
(
f
'{interface}.{vif_s}'
)
# check for the default value
self
.
assertEqual
(
tmp
[
'linkinfo'
][
'info_data'
][
'protocol'
],
'802.1ad'
)
# T3532: now change ethertype
new_protocol
=
'802.1q'
for
interface
in
self
.
_interfaces
:
for
vif_s
in
self
.
_qinq_range
:
base
=
self
.
_base_path
+
[
interface
,
'vif-s'
,
vif_s
]
self
.
cli_set
(
base
+
[
'protocol'
,
new_protocol
])
self
.
cli_commit
()
# Verify new ethertype configuration
for
interface
in
self
.
_interfaces
:
for
vif_s
in
self
.
_qinq_range
:
tmp
=
get_interface_config
(
f
'{interface}.{vif_s}'
)
self
.
assertEqual
(
tmp
[
'linkinfo'
][
'info_data'
][
'protocol'
],
new_protocol
.
upper
())
def
test_interface_ip_options
(
self
):
if
not
self
.
_test_ip
:
self
.
skipTest
(
'not supported'
)
arp_tmo
=
'300'
mss
=
'1420'
for
interface
in
self
.
_interfaces
:
path
=
self
.
_base_path
+
[
interface
]
for
option
in
self
.
_options
.
get
(
interface
,
[]):
self
.
cli_set
(
path
+
option
.
split
())
# Options
self
.
cli_set
(
path
+
[
'ip'
,
'adjust-mss'
,
mss
])
self
.
cli_set
(
path
+
[
'ip'
,
'arp-cache-timeout'
,
arp_tmo
])
self
.
cli_set
(
path
+
[
'ip'
,
'disable-arp-filter'
])
self
.
cli_set
(
path
+
[
'ip'
,
'disable-forwarding'
])
self
.
cli_set
(
path
+
[
'ip'
,
'enable-directed-broadcast'
])
self
.
cli_set
(
path
+
[
'ip'
,
'enable-arp-accept'
])
self
.
cli_set
(
path
+
[
'ip'
,
'enable-arp-announce'
])
self
.
cli_set
(
path
+
[
'ip'
,
'enable-arp-ignore'
])
self
.
cli_set
(
path
+
[
'ip'
,
'enable-proxy-arp'
])
self
.
cli_set
(
path
+
[
'ip'
,
'proxy-arp-pvlan'
])
self
.
cli_set
(
path
+
[
'ip'
,
'source-validation'
,
'loose'
])
self
.
cli_commit
()
for
interface
in
self
.
_interfaces
:
base_options
=
f
'oifname "{interface}"'
out
=
cmd
(
'sudo nft list chain raw VYOS_TCP_MSS'
)
for
line
in
out
.
splitlines
():
if
line
.
startswith
(
base_options
):
self
.
assertIn
(
f
'tcp option maxseg size set {mss}'
,
line
)
tmp
=
read_file
(
f
'/proc/sys/net/ipv4/neigh/{interface}/base_reachable_time_ms'
)
self
.
assertEqual
(
tmp
,
str
((
int
(
arp_tmo
)
*
1000
)))
# tmo value is in milli seconds
proc_base
=
f
'/proc/sys/net/ipv4/conf/{interface}'
tmp
=
read_file
(
f
'{proc_base}/arp_filter'
)
self
.
assertEqual
(
'0'
,
tmp
)
tmp
=
read_file
(
f
'{proc_base}/arp_accept'
)
self
.
assertEqual
(
'1'
,
tmp
)
tmp
=
read_file
(
f
'{proc_base}/arp_announce'
)
self
.
assertEqual
(
'1'
,
tmp
)
tmp
=
read_file
(
f
'{proc_base}/arp_ignore'
)
self
.
assertEqual
(
'1'
,
tmp
)
tmp
=
read_file
(
f
'{proc_base}/forwarding'
)
self
.
assertEqual
(
'0'
,
tmp
)
tmp
=
read_file
(
f
'{proc_base}/bc_forwarding'
)
self
.
assertEqual
(
'1'
,
tmp
)
tmp
=
read_file
(
f
'{proc_base}/proxy_arp'
)
self
.
assertEqual
(
'1'
,
tmp
)
tmp
=
read_file
(
f
'{proc_base}/proxy_arp_pvlan'
)
self
.
assertEqual
(
'1'
,
tmp
)
tmp
=
read_file
(
f
'{proc_base}/rp_filter'
)
self
.
assertEqual
(
'2'
,
tmp
)
def
test_interface_ipv6_options
(
self
):
if
not
self
.
_test_ipv6
:
self
.
skipTest
(
'not supported'
)
mss
=
'1400'
dad_transmits
=
'10'
for
interface
in
self
.
_interfaces
:
path
=
self
.
_base_path
+
[
interface
]
for
option
in
self
.
_options
.
get
(
interface
,
[]):
self
.
cli_set
(
path
+
option
.
split
())
# Options
self
.
cli_set
(
path
+
[
'ipv6'
,
'adjust-mss'
,
mss
])
self
.
cli_set
(
path
+
[
'ipv6'
,
'disable-forwarding'
])
self
.
cli_set
(
path
+
[
'ipv6'
,
'dup-addr-detect-transmits'
,
dad_transmits
])
self
.
cli_commit
()
for
interface
in
self
.
_interfaces
:
base_options
=
f
'oifname "{interface}"'
out
=
cmd
(
'sudo nft list chain ip6 raw VYOS_TCP_MSS'
)
for
line
in
out
.
splitlines
():
if
line
.
startswith
(
base_options
):
self
.
assertIn
(
f
'tcp option maxseg size set {mss}'
,
line
)
proc_base
=
f
'/proc/sys/net/ipv6/conf/{interface}'
tmp
=
read_file
(
f
'{proc_base}/forwarding'
)
self
.
assertEqual
(
'0'
,
tmp
)
tmp
=
read_file
(
f
'{proc_base}/dad_transmits'
)
self
.
assertEqual
(
dad_transmits
,
tmp
)
def
test_dhcpv6_client_options
(
self
):
if
not
self
.
_test_ipv6_dhcpc6
:
self
.
skipTest
(
'not supported'
)
duid_base
=
10
for
interface
in
self
.
_interfaces
:
duid
=
'00:01:00:01:27:71:db:f0:00:50:00:00:00:{}'
.
format
(
duid_base
)
path
=
self
.
_base_path
+
[
interface
]
for
option
in
self
.
_options
.
get
(
interface
,
[]):
self
.
cli_set
(
path
+
option
.
split
())
# Enable DHCPv6 client
self
.
cli_set
(
path
+
[
'address'
,
'dhcpv6'
])
self
.
cli_set
(
path
+
[
'dhcpv6-options'
,
'rapid-commit'
])
self
.
cli_set
(
path
+
[
'dhcpv6-options'
,
'parameters-only'
])
self
.
cli_set
(
path
+
[
'dhcpv6-options'
,
'duid'
,
duid
])
duid_base
+=
1
self
.
cli_commit
()
duid_base
=
10
for
interface
in
self
.
_interfaces
:
duid
=
'00:01:00:01:27:71:db:f0:00:50:00:00:00:{}'
.
format
(
duid_base
)
dhcpc6_config
=
read_file
(
f
'/run/dhcp6c/dhcp6c.{interface}.conf'
)
self
.
assertIn
(
f
'interface {interface} '
+
'{'
,
dhcpc6_config
)
self
.
assertIn
(
f
' request domain-name-servers;'
,
dhcpc6_config
)
self
.
assertIn
(
f
' request domain-name;'
,
dhcpc6_config
)
self
.
assertIn
(
f
' information-only;'
,
dhcpc6_config
)
self
.
assertIn
(
f
' send ia-na 0;'
,
dhcpc6_config
)
self
.
assertIn
(
f
' send rapid-commit;'
,
dhcpc6_config
)
self
.
assertIn
(
f
' send client-id {duid};'
,
dhcpc6_config
)
self
.
assertIn
(
'};'
,
dhcpc6_config
)
duid_base
+=
1
# Check for running process
self
.
assertTrue
(
process_named_running
(
'dhcp6c'
))
def
test_dhcpv6pd_auto_sla_id
(
self
):
if
not
self
.
_test_ipv6_pd
:
self
.
skipTest
(
'not supported'
)
prefix_len
=
'56'
sla_len
=
str
(
64
-
int
(
prefix_len
))
delegatees
=
[
'dum2340'
,
'dum2341'
,
'dum2342'
,
'dum2343'
,
'dum2344'
]
for
interface
in
self
.
_interfaces
:
path
=
self
.
_base_path
+
[
interface
]
for
option
in
self
.
_options
.
get
(
interface
,
[]):
self
.
cli_set
(
path
+
option
.
split
())
address
=
'1'
# prefix delegation stuff
pd_base
=
path
+
[
'dhcpv6-options'
,
'pd'
,
'0'
]
self
.
cli_set
(
pd_base
+
[
'length'
,
prefix_len
])
for
delegatee
in
delegatees
:
section
=
Section
.
section
(
delegatee
)
self
.
cli_set
([
'interfaces'
,
section
,
delegatee
])
self
.
cli_set
(
pd_base
+
[
'interface'
,
delegatee
,
'address'
,
address
])
# increment interface address
address
=
str
(
int
(
address
)
+
1
)
self
.
cli_commit
()
for
interface
in
self
.
_interfaces
:
dhcpc6_config
=
read_file
(
f
'/run/dhcp6c/dhcp6c.{interface}.conf'
)
# verify DHCPv6 prefix delegation
self
.
assertIn
(
f
'prefix ::/{prefix_len} infinity;'
,
dhcpc6_config
)
address
=
'1'
sla_id
=
'0'
for
delegatee
in
delegatees
:
self
.
assertIn
(
f
'prefix-interface {delegatee}'
+
r' {'
,
dhcpc6_config
)
self
.
assertIn
(
f
'ifid {address};'
,
dhcpc6_config
)
self
.
assertIn
(
f
'sla-id {sla_id};'
,
dhcpc6_config
)
self
.
assertIn
(
f
'sla-len {sla_len};'
,
dhcpc6_config
)
# increment sla-id
sla_id
=
str
(
int
(
sla_id
)
+
1
)
# increment interface address
address
=
str
(
int
(
address
)
+
1
)
# Check for running process
self
.
assertTrue
(
process_named_running
(
'dhcp6c'
))
for
delegatee
in
delegatees
:
# we can already cleanup the test delegatee interface here
# as until commit() is called, nothing happens
section
=
Section
.
section
(
delegatee
)
self
.
cli_delete
([
'interfaces'
,
section
,
delegatee
])
def
test_dhcpv6pd_manual_sla_id
(
self
):
if
not
self
.
_test_ipv6_pd
:
self
.
skipTest
(
'not supported'
)
prefix_len
=
'56'
sla_len
=
str
(
64
-
int
(
prefix_len
))
delegatees
=
[
'dum3340'
,
'dum3341'
,
'dum3342'
,
'dum3343'
,
'dum3344'
]
for
interface
in
self
.
_interfaces
:
path
=
self
.
_base_path
+
[
interface
]
for
option
in
self
.
_options
.
get
(
interface
,
[]):
self
.
cli_set
(
path
+
option
.
split
())
# prefix delegation stuff
address
=
'1'
sla_id
=
'1'
pd_base
=
path
+
[
'dhcpv6-options'
,
'pd'
,
'0'
]
self
.
cli_set
(
pd_base
+
[
'length'
,
prefix_len
])
for
delegatee
in
delegatees
:
section
=
Section
.
section
(
delegatee
)
self
.
cli_set
([
'interfaces'
,
section
,
delegatee
])
self
.
cli_set
(
pd_base
+
[
'interface'
,
delegatee
,
'address'
,
address
])
self
.
cli_set
(
pd_base
+
[
'interface'
,
delegatee
,
'sla-id'
,
sla_id
])
# increment interface address
address
=
str
(
int
(
address
)
+
1
)
sla_id
=
str
(
int
(
sla_id
)
+
1
)
self
.
cli_commit
()
# Verify dhcpc6 client configuration
for
interface
in
self
.
_interfaces
:
address
=
'1'
sla_id
=
'1'
dhcpc6_config
=
read_file
(
f
'/run/dhcp6c/dhcp6c.{interface}.conf'
)
# verify DHCPv6 prefix delegation
self
.
assertIn
(
f
'prefix ::/{prefix_len} infinity;'
,
dhcpc6_config
)
for
delegatee
in
delegatees
:
self
.
assertIn
(
f
'prefix-interface {delegatee}'
+
r' {'
,
dhcpc6_config
)
self
.
assertIn
(
f
'ifid {address};'
,
dhcpc6_config
)
self
.
assertIn
(
f
'sla-id {sla_id};'
,
dhcpc6_config
)
self
.
assertIn
(
f
'sla-len {sla_len};'
,
dhcpc6_config
)
# increment sla-id
sla_id
=
str
(
int
(
sla_id
)
+
1
)
# increment interface address
address
=
str
(
int
(
address
)
+
1
)
# Check for running process
self
.
assertTrue
(
process_named_running
(
'dhcp6c'
))
for
delegatee
in
delegatees
:
# we can already cleanup the test delegatee interface here
# as until commit() is called, nothing happens
section
=
Section
.
section
(
delegatee
)
self
.
cli_delete
([
'interfaces'
,
section
,
delegatee
])
File Metadata
Details
Attached
Mime Type
text/x-script.python
Expires
Mon, Dec 15, 5:36 PM (1 d, 20 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3071726
Default Alt Text
base_interfaces_test.py (36 KB)
Attached To
Mode
rVYOSONEX vyos-1x
Attached
Detach File
Event Timeline
Log In to Comment