Page Menu
Home
VyOS Platform
Search
Configure Global Search
Log In
Files
F38643509
base_interfaces_test.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
15 KB
Referenced Files
None
Subscribers
None
base_interfaces_test.py
View Options
# Copyright (C) 2019-2020 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
os
import
unittest
import
json
from
netifaces
import
ifaddresses
from
netifaces
import
AF_INET
from
netifaces
import
AF_INET6
from
vyos.configsession
import
ConfigSession
from
vyos.ifconfig
import
Interface
from
vyos.util
import
read_file
from
vyos.util
import
cmd
from
vyos.util
import
dict_search
from
vyos.validate
import
is_intf_addr_assigned
from
vyos.validate
import
is_ipv6_link_local
def
read_mirror_rule
(
interfaces
):
Success
=
0
for
interface
in
interfaces
:
get_tc_cmd
=
'tc -j qdisc'
tmp
=
cmd
(
get_tc_cmd
,
shell
=
True
)
data
=
json
.
loads
(
tmp
)
for
rule
in
data
:
dev
=
rule
[
'dev'
]
handle
=
rule
[
'handle'
]
kind
=
rule
[
'kind'
]
if
dev
==
interface
and
handle
==
"ffff:"
and
kind
==
"ingress"
:
Success
+=
1
elif
dev
==
interface
and
handle
==
"1:"
and
kind
==
"prio"
:
Success
+=
1
return
Success
class
BasicInterfaceTest
:
class
BaseTest
(
unittest
.
TestCase
):
_test_ip
=
False
_test_mtu
=
False
_test_vlan
=
False
_test_qinq
=
False
_test_ipv6
=
False
_test_mirror
=
False
_base_path
=
[]
_options
=
{}
_interfaces
=
[]
_qinq_range
=
[
'10'
,
'20'
,
'30'
]
_vlan_range
=
[
'100'
,
'200'
,
'300'
,
'2000'
]
# choose IPv6 minimum MTU value for tests - this must always work
_mtu
=
'1280'
def
setUp
(
self
):
self
.
session
=
ConfigSession
(
os
.
getpid
())
self
.
_test_addr
=
[
'192.0.2.1/26'
,
'192.0.2.255/31'
,
'192.0.2.64/32'
,
'2001:db8:1::ffff/64'
,
'2001:db8:101::1/112'
]
self
.
_test_mtu
=
False
self
.
_options
=
{}
def
tearDown
(
self
):
# we should not remove ethernet from the overall CLI
if
'ethernet'
in
self
.
_base_path
:
for
interface
in
self
.
_interfaces
:
# when using a dedicated interface to test via TEST_ETH environment
# variable only this one will be cleared in the end - usable to test
# ethernet interfaces via SSH
self
.
session
.
delete
(
self
.
_base_path
+
[
interface
])
self
.
session
.
set
(
self
.
_base_path
+
[
interface
,
'duplex'
,
'auto'
])
self
.
session
.
set
(
self
.
_base_path
+
[
interface
,
'speed'
,
'auto'
])
else
:
self
.
session
.
delete
(
self
.
_base_path
)
self
.
session
.
commit
()
del
self
.
session
def
test_mirror
(
self
):
if
self
.
_test_mirror
:
# Create test dependency interface
self
.
session
.
set
([
'interfaces'
,
'dummy'
,
'dum0'
])
self
.
session
.
set
([
'interfaces'
,
'dummy'
,
'dum1'
])
self
.
session
.
set
([
'interfaces'
,
'bonding'
,
'bond1'
,
'member'
,
'interface'
,
'dum0'
])
self
.
session
.
set
([
'interfaces'
,
'bonding'
,
'bond1'
,
'member'
,
'interface'
,
'dum1'
])
Success
=
0
i
=
0
# Check the two-way mirror rules of ingress and egress
for
interface
in
self
.
_interfaces
:
self
.
session
.
set
(
self
.
_base_path
+
[
interface
,
'mirror'
,
'ingress'
,
'bond1'
])
self
.
session
.
set
(
self
.
_base_path
+
[
interface
,
'mirror'
,
'egress'
,
'bond1'
])
i
+=
1
self
.
session
.
commit
()
# Parse configuration
Success
=
read_mirror_rule
(
self
.
_interfaces
)
if
Success
==
i
*
2
:
self
.
assertTrue
(
True
)
else
:
self
.
assertTrue
(
False
)
i
=
0
self
.
session
.
delete
([
'interfaces'
,
'dummy'
])
self
.
session
.
delete
([
'interfaces'
,
'bonding'
])
else
:
return
None
def
test_add_description
(
self
):
"""
Check if description can be added to interface
"""
for
intf
in
self
.
_interfaces
:
test_string
=
f
'Description-Test-{intf}'
self
.
session
.
set
(
self
.
_base_path
+
[
intf
,
'description'
,
test_string
])
for
option
in
self
.
_options
.
get
(
intf
,
[]):
self
.
session
.
set
(
self
.
_base_path
+
[
intf
]
+
option
.
split
())
self
.
session
.
commit
()
# Validate interface description
for
intf
in
self
.
_interfaces
:
test_string
=
f
'Description-Test-{intf}'
with
open
(
f
'/sys/class/net/{intf}/ifalias'
,
'r'
)
as
f
:
tmp
=
f
.
read
()
.
rstrip
()
self
.
assertTrue
(
tmp
,
test_string
)
def
test_add_address_single
(
self
):
"""
Check if a single address can be added to interface.
"""
addr
=
'192.0.2.0/31'
for
intf
in
self
.
_interfaces
:
self
.
session
.
set
(
self
.
_base_path
+
[
intf
,
'address'
,
addr
])
for
option
in
self
.
_options
.
get
(
intf
,
[]):
self
.
session
.
set
(
self
.
_base_path
+
[
intf
]
+
option
.
split
())
self
.
session
.
commit
()
for
intf
in
self
.
_interfaces
:
self
.
assertTrue
(
is_intf_addr_assigned
(
intf
,
addr
))
def
test_add_address_multi
(
self
):
"""
Check if IPv4/IPv6 addresses can be added to interface.
"""
# Add address
for
intf
in
self
.
_interfaces
:
for
addr
in
self
.
_test_addr
:
self
.
session
.
set
(
self
.
_base_path
+
[
intf
,
'address'
,
addr
])
for
option
in
self
.
_options
.
get
(
intf
,
[]):
self
.
session
.
set
(
self
.
_base_path
+
[
intf
]
+
option
.
split
())
self
.
session
.
commit
()
# Validate address
for
intf
in
self
.
_interfaces
:
for
af
in
AF_INET
,
AF_INET6
:
for
addr
in
ifaddresses
(
intf
)[
af
]:
# checking link local addresses makes no sense
if
is_ipv6_link_local
(
addr
[
'addr'
]):
continue
self
.
assertTrue
(
is_intf_addr_assigned
(
intf
,
addr
[
'addr'
]))
def
test_ipv6_link_local
(
self
):
""" Common function for IPv6 link-local address assignemnts """
if
not
self
.
_test_ipv6
:
return
None
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
for
option
in
self
.
_options
.
get
(
interface
,
[]):
self
.
session
.
set
(
base
+
option
.
split
())
# after commit we must have an IPv6 link-local address
self
.
session
.
commit
()
for
interface
in
self
.
_interfaces
:
for
addr
in
ifaddresses
(
interface
)[
AF_INET6
]:
self
.
assertTrue
(
is_ipv6_link_local
(
addr
[
'addr'
]))
# disable IPv6 link-local address assignment
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
self
.
session
.
set
(
base
+
[
'ipv6'
,
'address'
,
'no-default-link-local'
])
# after commit we must have no IPv6 link-local address
self
.
session
.
commit
()
for
interface
in
self
.
_interfaces
:
self
.
assertTrue
(
AF_INET6
not
in
ifaddresses
(
interface
))
def
_mtu_test
(
self
,
intf
):
""" helper function to verify MTU size """
with
open
(
f
'/sys/class/net/{intf}/mtu'
,
'r'
)
as
f
:
tmp
=
f
.
read
()
.
rstrip
()
self
.
assertEqual
(
tmp
,
self
.
_mtu
)
def
test_change_mtu
(
self
):
""" Testcase if MTU can be changed on interface """
if
not
self
.
_test_mtu
:
return
None
for
intf
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
intf
]
self
.
session
.
set
(
base
+
[
'mtu'
,
self
.
_mtu
])
for
option
in
self
.
_options
.
get
(
intf
,
[]):
self
.
session
.
set
(
base
+
option
.
split
())
# commit interface changes
self
.
session
.
commit
()
# verify changed MTU
for
intf
in
self
.
_interfaces
:
self
.
_mtu_test
(
intf
)
def
test_change_mtu_1200
(
self
):
""" Testcase if MTU can be changed to 1200 on non IPv6 enabled interfaces """
if
not
self
.
_test_mtu
:
return
None
old_mtu
=
self
.
_mtu
self
.
_mtu
=
'1200'
for
intf
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
intf
]
self
.
session
.
set
(
base
+
[
'mtu'
,
self
.
_mtu
])
self
.
session
.
set
(
base
+
[
'ipv6'
,
'address'
,
'no-default-link-local'
])
for
option
in
self
.
_options
.
get
(
intf
,
[]):
self
.
session
.
set
(
base
+
option
.
split
())
# commit interface changes
self
.
session
.
commit
()
# verify changed MTU
for
intf
in
self
.
_interfaces
:
self
.
_mtu_test
(
intf
)
self
.
_mtu
=
old_mtu
def
test_8021q_vlan
(
self
):
""" Testcase for 802.1q VLAN interfaces """
if
not
self
.
_test_vlan
:
return
None
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
for
option
in
self
.
_options
.
get
(
interface
,
[]):
self
.
session
.
set
(
base
+
option
.
split
())
for
vlan
in
self
.
_vlan_range
:
base
=
self
.
_base_path
+
[
interface
,
'vif'
,
vlan
]
self
.
session
.
set
(
base
+
[
'mtu'
,
self
.
_mtu
])
for
address
in
self
.
_test_addr
:
self
.
session
.
set
(
base
+
[
'address'
,
address
])
self
.
session
.
commit
()
for
intf
in
self
.
_interfaces
:
for
vlan
in
self
.
_vlan_range
:
vif
=
f
'{intf}.{vlan}'
for
address
in
self
.
_test_addr
:
self
.
assertTrue
(
is_intf_addr_assigned
(
vif
,
address
))
self
.
_mtu_test
(
vif
)
def
test_8021ad_qinq_vlan
(
self
):
""" Testcase for 802.1ad Q-in-Q VLAN interfaces """
if
not
self
.
_test_qinq
:
return
None
for
interface
in
self
.
_interfaces
:
base
=
self
.
_base_path
+
[
interface
]
for
option
in
self
.
_options
.
get
(
interface
,
[]):
self
.
session
.
set
(
base
+
option
.
split
())
for
vif_s
in
self
.
_qinq_range
:
for
vif_c
in
self
.
_vlan_range
:
base
=
self
.
_base_path
+
[
interface
,
'vif-s'
,
vif_s
,
'vif-c'
,
vif_c
]
self
.
session
.
set
(
base
+
[
'mtu'
,
self
.
_mtu
])
for
address
in
self
.
_test_addr
:
self
.
session
.
set
(
base
+
[
'address'
,
address
])
self
.
session
.
commit
()
for
interface
in
self
.
_interfaces
:
for
vif_s
in
self
.
_qinq_range
:
tmp
=
json
.
loads
(
cmd
(
f
'ip -d -j link show dev {interface}.{vif_s}'
))[
0
]
self
.
assertEqual
(
dict_search
(
'linkinfo.info_data.protocol'
,
tmp
),
'802.1ad'
)
for
vif_c
in
self
.
_vlan_range
:
vif
=
f
'{interface}.{vif_s}.{vif_c}'
for
address
in
self
.
_test_addr
:
self
.
assertTrue
(
is_intf_addr_assigned
(
vif
,
address
))
self
.
_mtu_test
(
vif
)
def
test_ip_options
(
self
):
""" Test interface base IPv4 options """
if
not
self
.
_test_ip
:
return
None
for
interface
in
self
.
_interfaces
:
arp_tmo
=
'300'
path
=
self
.
_base_path
+
[
interface
]
for
option
in
self
.
_options
.
get
(
interface
,
[]):
self
.
session
.
set
(
path
+
option
.
split
())
# Options
self
.
session
.
set
(
path
+
[
'ip'
,
'arp-cache-timeout'
,
arp_tmo
])
self
.
session
.
set
(
path
+
[
'ip'
,
'disable-arp-filter'
])
self
.
session
.
set
(
path
+
[
'ip'
,
'disable-forwarding'
])
self
.
session
.
set
(
path
+
[
'ip'
,
'enable-arp-accept'
])
self
.
session
.
set
(
path
+
[
'ip'
,
'enable-arp-announce'
])
self
.
session
.
set
(
path
+
[
'ip'
,
'enable-arp-ignore'
])
self
.
session
.
set
(
path
+
[
'ip'
,
'enable-proxy-arp'
])
self
.
session
.
set
(
path
+
[
'ip'
,
'proxy-arp-pvlan'
])
self
.
session
.
set
(
path
+
[
'ip'
,
'source-validation'
,
'loose'
])
self
.
session
.
commit
()
for
interface
in
self
.
_interfaces
:
tmp
=
read_file
(
f
'/proc/sys/net/ipv4/neigh/{interface}/base_reachable_time_ms'
)
self
.
assertEqual
(
tmp
,
str
((
int
(
arp_tmo
)
*
1000
)))
# tmo value is in milli seconds
tmp
=
read_file
(
f
'/proc/sys/net/ipv4/conf/{interface}/arp_filter'
)
self
.
assertEqual
(
'0'
,
tmp
)
tmp
=
read_file
(
f
'/proc/sys/net/ipv4/conf/{interface}/arp_accept'
)
self
.
assertEqual
(
'1'
,
tmp
)
tmp
=
read_file
(
f
'/proc/sys/net/ipv4/conf/{interface}/arp_announce'
)
self
.
assertEqual
(
'1'
,
tmp
)
tmp
=
read_file
(
f
'/proc/sys/net/ipv4/conf/{interface}/arp_ignore'
)
self
.
assertEqual
(
'1'
,
tmp
)
tmp
=
read_file
(
f
'/proc/sys/net/ipv4/conf/{interface}/forwarding'
)
self
.
assertEqual
(
'0'
,
tmp
)
tmp
=
read_file
(
f
'/proc/sys/net/ipv4/conf/{interface}/proxy_arp'
)
self
.
assertEqual
(
'1'
,
tmp
)
tmp
=
read_file
(
f
'/proc/sys/net/ipv4/conf/{interface}/proxy_arp_pvlan'
)
self
.
assertEqual
(
'1'
,
tmp
)
tmp
=
read_file
(
f
'/proc/sys/net/ipv4/conf/{interface}/rp_filter'
)
self
.
assertEqual
(
'2'
,
tmp
)
def
test_ipv6_options
(
self
):
""" Test interface base IPv6 options """
if
not
self
.
_test_ipv6
:
return
None
for
interface
in
self
.
_interfaces
:
dad_transmits
=
'10'
path
=
self
.
_base_path
+
[
interface
]
for
option
in
self
.
_options
.
get
(
interface
,
[]):
self
.
session
.
set
(
path
+
option
.
split
())
# Options
self
.
session
.
set
(
path
+
[
'ipv6'
,
'disable-forwarding'
])
self
.
session
.
set
(
path
+
[
'ipv6'
,
'dup-addr-detect-transmits'
,
dad_transmits
])
self
.
session
.
commit
()
for
interface
in
self
.
_interfaces
:
tmp
=
read_file
(
f
'/proc/sys/net/ipv6/conf/{interface}/forwarding'
)
self
.
assertEqual
(
'0'
,
tmp
)
tmp
=
read_file
(
f
'/proc/sys/net/ipv6/conf/{interface}/dad_transmits'
)
self
.
assertEqual
(
dad_transmits
,
tmp
)
File Metadata
Details
Attached
Mime Type
text/x-script.python
Expires
Mon, Dec 15, 5:35 PM (1 d, 15 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3102277
Default Alt Text
base_interfaces_test.py (15 KB)
Attached To
Mode
rVYOSONEX vyos-1x
Attached
Detach File
Event Timeline
Log In to Comment