Page Menu
Home
VyOS Platform
Search
Configure Global Search
Log In
Files
F38930239
test_protocols_bgp.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
35 KB
Referenced Files
None
Subscribers
None
test_protocols_bgp.py
View Options
#!/usr/bin/env python3
#
# Copyright (C) 2021 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
unittest
from
base_vyostest_shim
import
VyOSUnitTestSHIM
from
vyos.configsession
import
ConfigSessionError
from
vyos.template
import
is_ipv6
from
vyos.util
import
process_named_running
PROCESS_NAME
=
'bgpd'
ASN
=
'64512'
base_path
=
[
'protocols'
,
'bgp'
]
route_map_in
=
'foo-map-in'
route_map_out
=
'foo-map-out'
prefix_list_in
=
'pfx-foo-in'
prefix_list_out
=
'pfx-foo-out'
prefix_list_in6
=
'pfx-foo-in6'
prefix_list_out6
=
'pfx-foo-out6'
neighbor_config
=
{
'192.0.2.1'
:
{
'cap_dynamic'
:
''
,
'cap_ext_next'
:
''
,
'remote_as'
:
'100'
,
'adv_interv'
:
'400'
,
'passive'
:
''
,
'password'
:
'VyOS-Secure123'
,
'shutdown'
:
''
,
'cap_over'
:
''
,
'ttl_security'
:
'5'
,
'local_as'
:
'300'
,
'route_map_in'
:
route_map_in
,
'route_map_out'
:
route_map_out
,
'no_send_comm_ext'
:
''
,
'addpath_all'
:
''
,
},
'192.0.2.2'
:
{
'remote_as'
:
'200'
,
'shutdown'
:
''
,
'no_cap_nego'
:
''
,
'port'
:
'667'
,
'cap_strict'
:
''
,
'pfx_list_in'
:
prefix_list_in
,
'pfx_list_out'
:
prefix_list_out
,
'no_send_comm_std'
:
''
,
},
'192.0.2.3'
:
{
'description'
:
'foo bar baz'
,
'remote_as'
:
'200'
,
'passive'
:
''
,
'multi_hop'
:
'5'
,
'update_src'
:
'lo'
,
},
'2001:db8::1'
:
{
'cap_dynamic'
:
''
,
'cap_ext_next'
:
''
,
'remote_as'
:
'123'
,
'adv_interv'
:
'400'
,
'passive'
:
''
,
'password'
:
'VyOS-Secure123'
,
'shutdown'
:
''
,
'cap_over'
:
''
,
'ttl_security'
:
'5'
,
'local_as'
:
'300'
,
'solo'
:
''
,
'route_map_in'
:
route_map_in
,
'route_map_out'
:
route_map_out
,
'no_send_comm_std'
:
''
,
'addpath_per_as'
:
''
,
},
'2001:db8::2'
:
{
'remote_as'
:
'456'
,
'shutdown'
:
''
,
'no_cap_nego'
:
''
,
'port'
:
'667'
,
'cap_strict'
:
''
,
'pfx_list_in'
:
prefix_list_in6
,
'pfx_list_out'
:
prefix_list_out6
,
'no_send_comm_ext'
:
''
,
},
}
peer_group_config
=
{
'foo'
:
{
'remote_as'
:
'100'
,
'passive'
:
''
,
'password'
:
'VyOS-Secure123'
,
'shutdown'
:
''
,
'cap_over'
:
''
,
'ttl_security'
:
'5'
,
},
'bar'
:
{
'description'
:
'foo peer bar group'
,
'remote_as'
:
'200'
,
'shutdown'
:
''
,
'no_cap_nego'
:
''
,
'local_as'
:
'300'
,
'pfx_list_in'
:
prefix_list_in
,
'pfx_list_out'
:
prefix_list_out
,
'no_send_comm_ext'
:
''
,
},
'baz'
:
{
'cap_dynamic'
:
''
,
'cap_ext_next'
:
''
,
'remote_as'
:
'200'
,
'passive'
:
''
,
'multi_hop'
:
'5'
,
'update_src'
:
'lo'
,
'route_map_in'
:
route_map_in
,
'route_map_out'
:
route_map_out
,
},
}
class
TestProtocolsBGP
(
VyOSUnitTestSHIM
.
TestCase
):
def
setUp
(
self
):
self
.
cli_set
([
'policy'
,
'route-map'
,
route_map_in
,
'rule'
,
'10'
,
'action'
,
'permit'
])
self
.
cli_set
([
'policy'
,
'route-map'
,
route_map_out
,
'rule'
,
'10'
,
'action'
,
'permit'
])
self
.
cli_set
([
'policy'
,
'prefix-list'
,
prefix_list_in
,
'rule'
,
'10'
,
'action'
,
'permit'
])
self
.
cli_set
([
'policy'
,
'prefix-list'
,
prefix_list_in
,
'rule'
,
'10'
,
'prefix'
,
'192.0.2.0/25'
])
self
.
cli_set
([
'policy'
,
'prefix-list'
,
prefix_list_out
,
'rule'
,
'10'
,
'action'
,
'permit'
])
self
.
cli_set
([
'policy'
,
'prefix-list'
,
prefix_list_out
,
'rule'
,
'10'
,
'prefix'
,
'192.0.2.128/25'
])
self
.
cli_set
([
'policy'
,
'prefix-list6'
,
prefix_list_in6
,
'rule'
,
'10'
,
'action'
,
'permit'
])
self
.
cli_set
([
'policy'
,
'prefix-list6'
,
prefix_list_in6
,
'rule'
,
'10'
,
'prefix'
,
'2001:db8:1000::/64'
])
self
.
cli_set
([
'policy'
,
'prefix-list6'
,
prefix_list_out6
,
'rule'
,
'10'
,
'action'
,
'deny'
])
self
.
cli_set
([
'policy'
,
'prefix-list6'
,
prefix_list_out6
,
'rule'
,
'10'
,
'prefix'
,
'2001:db8:2000::/64'
])
self
.
cli_set
(
base_path
+
[
'local-as'
,
ASN
])
def
tearDown
(
self
):
self
.
cli_delete
([
'policy'
])
self
.
cli_delete
([
'vrf'
])
self
.
cli_delete
(
base_path
)
self
.
cli_commit
()
# Check for running process
self
.
assertTrue
(
process_named_running
(
PROCESS_NAME
))
def
verify_frr_config
(
self
,
peer
,
peer_config
,
frrconfig
):
# recurring patterns to verify for both a simple neighbor and a peer-group
if
'cap_dynamic'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} capability dynamic'
,
frrconfig
)
if
'cap_ext_next'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} capability extended-nexthop'
,
frrconfig
)
if
'description'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} description {peer_config["description"]}'
,
frrconfig
)
if
'no_cap_nego'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} dont-capability-negotiate'
,
frrconfig
)
if
'multi_hop'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} ebgp-multihop {peer_config["multi_hop"]}'
,
frrconfig
)
if
'local_as'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} local-as {peer_config["local_as"]} no-prepend replace-as'
,
frrconfig
)
if
'cap_over'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} override-capability'
,
frrconfig
)
if
'passive'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} passive'
,
frrconfig
)
if
'password'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} password {peer_config["password"]}'
,
frrconfig
)
if
'remote_as'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} remote-as {peer_config["remote_as"]}'
,
frrconfig
)
if
'solo'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} solo'
,
frrconfig
)
if
'shutdown'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} shutdown'
,
frrconfig
)
if
'ttl_security'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} ttl-security hops {peer_config["ttl_security"]}'
,
frrconfig
)
if
'update_src'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} update-source {peer_config["update_src"]}'
,
frrconfig
)
if
'route_map_in'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} route-map {peer_config["route_map_in"]} in'
,
frrconfig
)
if
'route_map_out'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} route-map {peer_config["route_map_out"]} out'
,
frrconfig
)
if
'pfx_list_in'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} prefix-list {peer_config["pfx_list_in"]} in'
,
frrconfig
)
if
'pfx_list_out'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} prefix-list {peer_config["pfx_list_out"]} out'
,
frrconfig
)
if
'no_send_comm_std'
in
peer_config
:
self
.
assertIn
(
f
' no neighbor {peer} send-community'
,
frrconfig
)
if
'no_send_comm_ext'
in
peer_config
:
self
.
assertIn
(
f
' no neighbor {peer} send-community extended'
,
frrconfig
)
if
'addpath_all'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} addpath-tx-all-paths'
,
frrconfig
)
if
'addpath_per_as'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} addpath-tx-bestpath-per-AS'
,
frrconfig
)
def
test_bgp_01_simple
(
self
):
router_id
=
'127.0.0.1'
local_pref
=
'500'
stalepath_time
=
'60'
max_path_v4
=
'2'
max_path_v4ibgp
=
'4'
max_path_v6
=
'8'
max_path_v6ibgp
=
'16'
self
.
cli_set
(
base_path
+
[
'parameters'
,
'router-id'
,
router_id
])
self
.
cli_set
(
base_path
+
[
'parameters'
,
'log-neighbor-changes'
])
# Local AS number MUST be defined - as this is set in setUp() we remove
# this once for testing of the proper error
self
.
cli_delete
(
base_path
+
[
'local-as'
])
with
self
.
assertRaises
(
ConfigSessionError
):
self
.
cli_commit
()
self
.
cli_set
(
base_path
+
[
'local-as'
,
ASN
])
# Default local preference (higher = more preferred, default value is 100)
self
.
cli_set
(
base_path
+
[
'parameters'
,
'default'
,
'local-pref'
,
local_pref
])
self
.
cli_set
(
base_path
+
[
'parameters'
,
'graceful-restart'
,
'stalepath-time'
,
stalepath_time
])
self
.
cli_set
(
base_path
+
[
'parameters'
,
'graceful-shutdown'
])
self
.
cli_set
(
base_path
+
[
'parameters'
,
'ebgp-requires-policy'
])
self
.
cli_set
(
base_path
+
[
'parameters'
,
'bestpath'
,
'as-path'
,
'multipath-relax'
])
self
.
cli_set
(
base_path
+
[
'parameters'
,
'bestpath'
,
'bandwidth'
,
'default-weight-for-missing'
])
self
.
cli_set
(
base_path
+
[
'parameters'
,
'bestpath'
,
'compare-routerid'
])
# AFI maximum path support
self
.
cli_set
(
base_path
+
[
'address-family'
,
'ipv4-unicast'
,
'maximum-paths'
,
'ebgp'
,
max_path_v4
])
self
.
cli_set
(
base_path
+
[
'address-family'
,
'ipv4-unicast'
,
'maximum-paths'
,
'ibgp'
,
max_path_v4ibgp
])
self
.
cli_set
(
base_path
+
[
'address-family'
,
'ipv6-unicast'
,
'maximum-paths'
,
'ebgp'
,
max_path_v6
])
self
.
cli_set
(
base_path
+
[
'address-family'
,
'ipv6-unicast'
,
'maximum-paths'
,
'ibgp'
,
max_path_v6ibgp
])
# commit changes
self
.
cli_commit
()
# Verify FRR bgpd configuration
frrconfig
=
self
.
getFRRconfig
(
f
'router bgp {ASN}'
)
self
.
assertIn
(
f
'router bgp {ASN}'
,
frrconfig
)
self
.
assertIn
(
f
' bgp router-id {router_id}'
,
frrconfig
)
self
.
assertIn
(
f
' bgp log-neighbor-changes'
,
frrconfig
)
self
.
assertIn
(
f
' bgp default local-preference {local_pref}'
,
frrconfig
)
self
.
assertIn
(
f
' bgp graceful-restart stalepath-time {stalepath_time}'
,
frrconfig
)
self
.
assertIn
(
f
' bgp graceful-shutdown'
,
frrconfig
)
self
.
assertIn
(
f
' bgp bestpath as-path multipath-relax'
,
frrconfig
)
self
.
assertIn
(
f
' bgp bestpath bandwidth default-weight-for-missing'
,
frrconfig
)
self
.
assertIn
(
f
' bgp bestpath compare-routerid'
,
frrconfig
)
self
.
assertNotIn
(
f
'bgp ebgp-requires-policy'
,
frrconfig
)
afiv4_config
=
self
.
getFRRconfig
(
' address-family ipv4 unicast'
)
self
.
assertIn
(
f
' maximum-paths {max_path_v4}'
,
afiv4_config
)
self
.
assertIn
(
f
' maximum-paths ibgp {max_path_v4ibgp}'
,
afiv4_config
)
afiv6_config
=
self
.
getFRRconfig
(
' address-family ipv6 unicast'
)
self
.
assertIn
(
f
' maximum-paths {max_path_v6}'
,
afiv6_config
)
self
.
assertIn
(
f
' maximum-paths ibgp {max_path_v6ibgp}'
,
afiv6_config
)
def
test_bgp_02_neighbors
(
self
):
# Test out individual neighbor configuration items, not all of them are
# also available to a peer-group!
for
peer
,
peer_config
in
neighbor_config
.
items
():
afi
=
'ipv4-unicast'
if
is_ipv6
(
peer
):
afi
=
'ipv6-unicast'
if
'adv_interv'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'advertisement-interval'
,
peer_config
[
"adv_interv"
]])
if
'cap_dynamic'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'capability'
,
'dynamic'
])
if
'cap_ext_next'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'capability'
,
'extended-nexthop'
])
if
'description'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'description'
,
peer_config
[
"description"
]])
if
'no_cap_nego'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'disable-capability-negotiation'
])
if
'multi_hop'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'ebgp-multihop'
,
peer_config
[
"multi_hop"
]])
if
'local_as'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'local-as'
,
peer_config
[
"local_as"
],
'no-prepend'
,
'replace-as'
])
if
'cap_over'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'override-capability'
])
if
'passive'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'passive'
])
if
'password'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'password'
,
peer_config
[
"password"
]])
if
'port'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'port'
,
peer_config
[
"port"
]])
if
'remote_as'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'remote-as'
,
peer_config
[
"remote_as"
]])
if
'cap_strict'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'strict-capability-match'
])
if
'shutdown'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'shutdown'
])
if
'solo'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'solo'
])
if
'ttl_security'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'ttl-security'
,
'hops'
,
peer_config
[
"ttl_security"
]])
if
'update_src'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'update-source'
,
peer_config
[
"update_src"
]])
if
'route_map_in'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'address-family'
,
afi
,
'route-map'
,
'import'
,
peer_config
[
"route_map_in"
]])
if
'route_map_out'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'address-family'
,
afi
,
'route-map'
,
'export'
,
peer_config
[
"route_map_out"
]])
if
'pfx_list_in'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'address-family'
,
afi
,
'prefix-list'
,
'import'
,
peer_config
[
"pfx_list_in"
]])
if
'pfx_list_out'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'address-family'
,
afi
,
'prefix-list'
,
'export'
,
peer_config
[
"pfx_list_out"
]])
if
'no_send_comm_std'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'address-family'
,
afi
,
'disable-send-community'
,
'standard'
])
if
'no_send_comm_ext'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'address-family'
,
afi
,
'disable-send-community'
,
'extended'
])
if
'addpath_all'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'address-family'
,
afi
,
'addpath-tx-all'
])
if
'addpath_per_as'
in
peer_config
:
self
.
cli_set
(
base_path
+
[
'neighbor'
,
peer
,
'address-family'
,
afi
,
'addpath-tx-per-as'
])
# commit changes
self
.
cli_commit
()
# Verify FRR bgpd configuration
frrconfig
=
self
.
getFRRconfig
(
f
'router bgp {ASN}'
)
self
.
assertIn
(
f
'router bgp {ASN}'
,
frrconfig
)
for
peer
,
peer_config
in
neighbor_config
.
items
():
if
'adv_interv'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} advertisement-interval {peer_config["adv_interv"]}'
,
frrconfig
)
if
'port'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} port {peer_config["port"]}'
,
frrconfig
)
if
'cap_strict'
in
peer_config
:
self
.
assertIn
(
f
' neighbor {peer} strict-capability-match'
,
frrconfig
)
self
.
verify_frr_config
(
peer
,
peer_config
,
frrconfig
)
def
test_bgp_03_peer_groups
(
self
):
# Test out individual peer-group configuration items
for
peer_group
,
config
in
peer_group_config
.
items
():
if
'cap_dynamic'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'capability'
,
'dynamic'
])
if
'cap_ext_next'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'capability'
,
'extended-nexthop'
])
if
'description'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'description'
,
config
[
"description"
]])
if
'no_cap_nego'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'disable-capability-negotiation'
])
if
'multi_hop'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'ebgp-multihop'
,
config
[
"multi_hop"
]])
if
'local_as'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'local-as'
,
config
[
"local_as"
],
'no-prepend'
,
'replace-as'
])
if
'cap_over'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'override-capability'
])
if
'passive'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'passive'
])
if
'password'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'password'
,
config
[
"password"
]])
if
'remote_as'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'remote-as'
,
config
[
"remote_as"
]])
if
'shutdown'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'shutdown'
])
if
'ttl_security'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'ttl-security'
,
'hops'
,
config
[
"ttl_security"
]])
if
'update_src'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'update-source'
,
config
[
"update_src"
]])
if
'route_map_in'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'address-family'
,
'ipv4-unicast'
,
'route-map'
,
'import'
,
config
[
"route_map_in"
]])
if
'route_map_out'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'address-family'
,
'ipv4-unicast'
,
'route-map'
,
'export'
,
config
[
"route_map_out"
]])
if
'pfx_list_in'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'address-family'
,
'ipv4-unicast'
,
'prefix-list'
,
'import'
,
config
[
"pfx_list_in"
]])
if
'pfx_list_out'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'address-family'
,
'ipv4-unicast'
,
'prefix-list'
,
'export'
,
config
[
"pfx_list_out"
]])
if
'no_send_comm_std'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'address-family'
,
'ipv4-unicast'
,
'disable-send-community'
,
'standard'
])
if
'no_send_comm_ext'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'address-family'
,
'ipv4-unicast'
,
'disable-send-community'
,
'extended'
])
if
'addpath_all'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'address-family'
,
'ipv4-unicast'
,
'addpath-tx-all'
])
if
'addpath_per_as'
in
config
:
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'address-family'
,
'ipv4-unicast'
,
'addpath-tx-per-as'
])
# commit changes
self
.
cli_commit
()
# Verify FRR bgpd configuration
frrconfig
=
self
.
getFRRconfig
(
f
'router bgp {ASN}'
)
self
.
assertIn
(
f
'router bgp {ASN}'
,
frrconfig
)
for
peer
,
peer_config
in
peer_group_config
.
items
():
self
.
assertIn
(
f
' neighbor {peer_group} peer-group'
,
frrconfig
)
self
.
verify_frr_config
(
peer
,
peer_config
,
frrconfig
)
def
test_bgp_04_afi_ipv4
(
self
):
networks
=
{
'10.0.0.0/8'
:
{
'as_set'
:
''
,
},
'100.64.0.0/10'
:
{
'as_set'
:
''
,
},
'192.168.0.0/16'
:
{
'summary_only'
:
''
,
},
}
# We want to redistribute ...
redistributes
=
[
'connected'
,
'isis'
,
'kernel'
,
'ospf'
,
'rip'
,
'static'
]
for
redistribute
in
redistributes
:
self
.
cli_set
(
base_path
+
[
'address-family'
,
'ipv4-unicast'
,
'redistribute'
,
redistribute
])
for
network
,
network_config
in
networks
.
items
():
self
.
cli_set
(
base_path
+
[
'address-family'
,
'ipv4-unicast'
,
'network'
,
network
])
if
'as_set'
in
network_config
:
self
.
cli_set
(
base_path
+
[
'address-family'
,
'ipv4-unicast'
,
'aggregate-address'
,
network
,
'as-set'
])
if
'summary_only'
in
network_config
:
self
.
cli_set
(
base_path
+
[
'address-family'
,
'ipv4-unicast'
,
'aggregate-address'
,
network
,
'summary-only'
])
# commit changes
self
.
cli_commit
()
# Verify FRR bgpd configuration
frrconfig
=
self
.
getFRRconfig
(
f
'router bgp {ASN}'
)
self
.
assertIn
(
f
'router bgp {ASN}'
,
frrconfig
)
self
.
assertIn
(
f
' address-family ipv4 unicast'
,
frrconfig
)
for
redistribute
in
redistributes
:
self
.
assertIn
(
f
' redistribute {redistribute}'
,
frrconfig
)
for
network
,
network_config
in
networks
.
items
():
self
.
assertIn
(
f
' network {network}'
,
frrconfig
)
if
'as_set'
in
network_config
:
self
.
assertIn
(
f
' aggregate-address {network} as-set'
,
frrconfig
)
if
'summary_only'
in
network_config
:
self
.
assertIn
(
f
' aggregate-address {network} summary-only'
,
frrconfig
)
def
test_bgp_05_afi_ipv6
(
self
):
networks
=
{
'2001:db8:100::/48'
:
{
},
'2001:db8:200::/48'
:
{
},
'2001:db8:300::/48'
:
{
'summary_only'
:
''
,
},
}
# We want to redistribute ...
redistributes
=
[
'connected'
,
'kernel'
,
'ospfv3'
,
'ripng'
,
'static'
]
for
redistribute
in
redistributes
:
self
.
cli_set
(
base_path
+
[
'address-family'
,
'ipv6-unicast'
,
'redistribute'
,
redistribute
])
for
network
,
network_config
in
networks
.
items
():
self
.
cli_set
(
base_path
+
[
'address-family'
,
'ipv6-unicast'
,
'network'
,
network
])
if
'summary_only'
in
network_config
:
self
.
cli_set
(
base_path
+
[
'address-family'
,
'ipv6-unicast'
,
'aggregate-address'
,
network
,
'summary-only'
])
# commit changes
self
.
cli_commit
()
# Verify FRR bgpd configuration
frrconfig
=
self
.
getFRRconfig
(
f
'router bgp {ASN}'
)
self
.
assertIn
(
f
'router bgp {ASN}'
,
frrconfig
)
self
.
assertIn
(
f
' address-family ipv6 unicast'
,
frrconfig
)
# T2100: By default ebgp-requires-policy is disabled to keep VyOS
# 1.3 and 1.2 backwards compatibility
self
.
assertIn
(
f
' no bgp ebgp-requires-policy'
,
frrconfig
)
for
redistribute
in
redistributes
:
# FRR calls this OSPF6
if
redistribute
==
'ospfv3'
:
redistribute
=
'ospf6'
self
.
assertIn
(
f
' redistribute {redistribute}'
,
frrconfig
)
for
network
,
network_config
in
networks
.
items
():
self
.
assertIn
(
f
' network {network}'
,
frrconfig
)
if
'as_set'
in
network_config
:
self
.
assertIn
(
f
' aggregate-address {network} summary-only'
,
frrconfig
)
def
test_bgp_06_listen_range
(
self
):
# Implemented via T1875
limit
=
'64'
listen_ranges
=
[
'192.0.2.0/25'
,
'192.0.2.128/25'
]
peer_group
=
'listenfoobar'
self
.
cli_set
(
base_path
+
[
'listen'
,
'limit'
,
limit
])
for
prefix
in
listen_ranges
:
self
.
cli_set
(
base_path
+
[
'listen'
,
'range'
,
prefix
])
# check validate() - peer-group must be defined for range/prefix
with
self
.
assertRaises
(
ConfigSessionError
):
self
.
cli_commit
()
self
.
cli_set
(
base_path
+
[
'listen'
,
'range'
,
prefix
,
'peer-group'
,
peer_group
])
# check validate() - peer-group does yet not exist!
with
self
.
assertRaises
(
ConfigSessionError
):
self
.
cli_commit
()
self
.
cli_set
(
base_path
+
[
'peer-group'
,
peer_group
,
'remote-as'
,
ASN
])
# commit changes
self
.
cli_commit
()
# Verify FRR bgpd configuration
frrconfig
=
self
.
getFRRconfig
(
f
'router bgp {ASN}'
)
self
.
assertIn
(
f
'router bgp {ASN}'
,
frrconfig
)
self
.
assertIn
(
f
' neighbor {peer_group} peer-group'
,
frrconfig
)
self
.
assertIn
(
f
' neighbor {peer_group} remote-as {ASN}'
,
frrconfig
)
self
.
assertIn
(
f
' bgp listen limit {limit}'
,
frrconfig
)
for
prefix
in
listen_ranges
:
self
.
assertIn
(
f
' bgp listen range {prefix} peer-group {peer_group}'
,
frrconfig
)
def
test_bgp_07_l2vpn_evpn
(
self
):
vnis
=
[
'10010'
,
'10020'
,
'10030'
]
neighbors
=
[
'192.0.2.10'
,
'192.0.2.20'
,
'192.0.2.30'
]
self
.
cli_set
(
base_path
+
[
'address-family'
,
'l2vpn-evpn'
,
'advertise-all-vni'
])
self
.
cli_set
(
base_path
+
[
'address-family'
,
'l2vpn-evpn'
,
'advertise-default-gw'
])
self
.
cli_set
(
base_path
+
[
'address-family'
,
'l2vpn-evpn'
,
'advertise-svi-ip'
])
self
.
cli_set
(
base_path
+
[
'address-family'
,
'l2vpn-evpn'
,
'flooding'
,
'disable'
])
for
vni
in
vnis
:
self
.
cli_set
(
base_path
+
[
'address-family'
,
'l2vpn-evpn'
,
'vni'
,
vni
,
'advertise-default-gw'
])
self
.
cli_set
(
base_path
+
[
'address-family'
,
'l2vpn-evpn'
,
'vni'
,
vni
,
'advertise-svi-ip'
])
# commit changes
self
.
cli_commit
()
# Verify FRR bgpd configuration
frrconfig
=
self
.
getFRRconfig
(
f
'router bgp {ASN}'
)
self
.
assertIn
(
f
'router bgp {ASN}'
,
frrconfig
)
self
.
assertIn
(
f
' address-family l2vpn evpn'
,
frrconfig
)
self
.
assertIn
(
f
' advertise-all-vni'
,
frrconfig
)
self
.
assertIn
(
f
' advertise-default-gw'
,
frrconfig
)
self
.
assertIn
(
f
' advertise-svi-ip'
,
frrconfig
)
self
.
assertIn
(
f
' flooding disable'
,
frrconfig
)
for
vni
in
vnis
:
vniconfig
=
self
.
getFRRconfig
(
f
' vni {vni}'
)
self
.
assertIn
(
f
'vni {vni}'
,
vniconfig
)
self
.
assertIn
(
f
' advertise-default-gw'
,
vniconfig
)
self
.
assertIn
(
f
' advertise-svi-ip'
,
vniconfig
)
def
test_bgp_08_zebra_route_map
(
self
):
# Implemented because of T3328
self
.
cli_set
(
base_path
+
[
'route-map'
,
route_map_in
])
# commit changes
self
.
cli_commit
()
# Verify FRR configuration
zebra_route_map
=
f
'ip protocol bgp route-map {route_map_in}'
frrconfig
=
self
.
getFRRconfig
(
zebra_route_map
)
self
.
assertIn
(
zebra_route_map
,
frrconfig
)
# Remove the route-map again
self
.
cli_delete
(
base_path
+
[
'route-map'
])
# commit changes
self
.
cli_commit
()
# Verify FRR configuration
frrconfig
=
self
.
getFRRconfig
(
zebra_route_map
)
self
.
assertNotIn
(
zebra_route_map
,
frrconfig
)
def
test_bgp_09_distance_and_flowspec
(
self
):
distance_external
=
'25'
distance_internal
=
'30'
distance_local
=
'35'
distance_v4_prefix
=
'169.254.0.0/32'
distance_v6_prefix
=
'2001::/128'
distance_prefix_value
=
'110'
distance_families
=
[
'ipv4-unicast'
,
'ipv6-unicast'
,
'ipv4-multicast'
,
'ipv6-multicast'
]
verify_families
=
[
'ipv4 unicast'
,
'ipv6 unicast'
,
'ipv4 multicast'
,
'ipv6 multicast'
]
flowspec_families
=
[
'address-family ipv4 flowspec'
,
'address-family ipv6 flowspec'
]
flowspec_int
=
'lo'
# Per family distance support
for
family
in
distance_families
:
self
.
cli_set
(
base_path
+
[
'address-family'
,
family
,
'distance'
,
'external'
,
distance_external
])
self
.
cli_set
(
base_path
+
[
'address-family'
,
family
,
'distance'
,
'internal'
,
distance_internal
])
self
.
cli_set
(
base_path
+
[
'address-family'
,
family
,
'distance'
,
'local'
,
distance_local
])
if
'ipv4'
in
family
:
self
.
cli_set
(
base_path
+
[
'address-family'
,
family
,
'distance'
,
'prefix'
,
distance_v4_prefix
,
'distance'
,
distance_prefix_value
])
if
'ipv6'
in
family
:
self
.
cli_set
(
base_path
+
[
'address-family'
,
family
,
'distance'
,
'prefix'
,
distance_v6_prefix
,
'distance'
,
distance_prefix_value
])
# IPv4 flowspec interface check
self
.
cli_set
(
base_path
+
[
'address-family'
,
'ipv4-flowspec'
,
'local-install'
,
'interface'
,
flowspec_int
])
# IPv6 flowspec interface check
self
.
cli_set
(
base_path
+
[
'address-family'
,
'ipv6-flowspec'
,
'local-install'
,
'interface'
,
flowspec_int
])
# Commit changes
self
.
cli_commit
()
# Verify FRR distances configuration
frrconfig
=
self
.
getFRRconfig
(
f
'router bgp {ASN}'
)
self
.
assertIn
(
f
'router bgp {ASN}'
,
frrconfig
)
for
family
in
verify_families
:
self
.
assertIn
(
f
'address-family {family}'
,
frrconfig
)
self
.
assertIn
(
f
'distance bgp {distance_external} {distance_internal} {distance_local}'
,
frrconfig
)
if
'ipv4'
in
family
:
self
.
assertIn
(
f
'distance {distance_prefix_value} {distance_v4_prefix}'
,
frrconfig
)
if
'ipv6'
in
family
:
self
.
assertIn
(
f
'distance {distance_prefix_value} {distance_v6_prefix}'
,
frrconfig
)
# Verify FRR flowspec configuration
for
family
in
flowspec_families
:
self
.
assertIn
(
f
'{family}'
,
frrconfig
)
self
.
assertIn
(
f
'local-install {flowspec_int}'
,
frrconfig
)
def
test_bgp_10_vrf_simple
(
self
):
router_id
=
'127.0.0.3'
vrfs
=
[
'red'
,
'green'
,
'blue'
]
# It is safe to assume that when the basic VRF test works, all
# other BGP related features work, as we entirely inherit the CLI
# templates and Jinja2 FRR template.
table
=
'1000'
self
.
cli_set
(
base_path
+
[
'local-as'
,
ASN
])
# testing only one AFI is sufficient as it's generic code
for
vrf
in
vrfs
:
vrf_base
=
[
'vrf'
,
'name'
,
vrf
]
self
.
cli_set
(
vrf_base
+
[
'table'
,
table
])
self
.
cli_set
(
vrf_base
+
[
'protocols'
,
'bgp'
,
'local-as'
,
ASN
])
self
.
cli_set
(
vrf_base
+
[
'protocols'
,
'bgp'
,
'parameters'
,
'router-id'
,
router_id
])
self
.
cli_set
(
vrf_base
+
[
'protocols'
,
'bgp'
,
'route-map'
,
route_map_in
])
table
=
str
(
int
(
table
)
+
1000
)
# import VRF routes do main RIB
self
.
cli_set
(
base_path
+
[
'address-family'
,
'ipv6-unicast'
,
'import'
,
'vrf'
,
vrf
])
self
.
cli_commit
()
# Verify FRR bgpd configuration
frrconfig
=
self
.
getFRRconfig
(
f
'router bgp {ASN}'
)
self
.
assertIn
(
f
'router bgp {ASN}'
,
frrconfig
)
self
.
assertIn
(
f
' address-family ipv6 unicast'
,
frrconfig
)
for
vrf
in
vrfs
:
self
.
assertIn
(
f
' import vrf {vrf}'
,
frrconfig
)
# Verify FRR bgpd configuration
frr_vrf_config
=
self
.
getFRRconfig
(
f
'router bgp {ASN} vrf {vrf}'
)
self
.
assertIn
(
f
'router bgp {ASN} vrf {vrf}'
,
frr_vrf_config
)
self
.
assertIn
(
f
' bgp router-id {router_id}'
,
frr_vrf_config
)
# XXX: Currently this is not working as FRR() class does not support
# route-maps for multiple vrfs because the modify_section() only works
# on lines and not text blocks.
#
# vrfconfig = self.getFRRconfig(f'vrf {vrf}')
# zebra_route_map = f' ip protocol bgp route-map {route_map_in}'
# self.assertIn(zebra_route_map, vrfconfig)
def
test_bgp_11_confederation
(
self
):
router_id
=
'127.10.10.2'
confed_id
=
str
(
int
(
ASN
)
+
1
)
confed_asns
=
'10 20 30 40'
self
.
cli_set
(
base_path
+
[
'local-as'
,
ASN
])
self
.
cli_set
(
base_path
+
[
'parameters'
,
'router-id'
,
router_id
])
self
.
cli_set
(
base_path
+
[
'parameters'
,
'confederation'
,
'identifier'
,
confed_id
])
for
asn
in
confed_asns
.
split
():
self
.
cli_set
(
base_path
+
[
'parameters'
,
'confederation'
,
'peers'
,
asn
])
# commit changes
self
.
cli_commit
()
# Verify FRR bgpd configuration
frrconfig
=
self
.
getFRRconfig
(
f
'router bgp {ASN}'
)
self
.
assertIn
(
f
'router bgp {ASN}'
,
frrconfig
)
self
.
assertIn
(
f
' bgp router-id {router_id}'
,
frrconfig
)
self
.
assertIn
(
f
' bgp confederation identifier {confed_id}'
,
frrconfig
)
self
.
assertIn
(
f
' bgp confederation peers {confed_asns}'
,
frrconfig
)
def
test_bgp_12_v6_link_local
(
self
):
remote_asn
=
str
(
int
(
ASN
)
+
10
)
interface
=
'eth0'
self
.
cli_set
(
base_path
+
[
'local-as'
,
ASN
])
self
.
cli_set
(
base_path
+
[
'neighbor'
,
interface
,
'address-family'
,
'ipv6-unicast'
])
self
.
cli_set
(
base_path
+
[
'neighbor'
,
interface
,
'interface'
,
'v6only'
,
'remote-as'
,
remote_asn
])
# commit changes
self
.
cli_commit
()
# Verify FRR bgpd configuration
frrconfig
=
self
.
getFRRconfig
(
f
'router bgp {ASN}'
)
self
.
assertIn
(
f
'router bgp {ASN}'
,
frrconfig
)
self
.
assertIn
(
f
' neighbor {interface} interface v6only remote-as {remote_asn}'
,
frrconfig
)
self
.
assertIn
(
f
' address-family ipv6 unicast'
,
frrconfig
)
self
.
assertIn
(
f
' neighbor {interface} activate'
,
frrconfig
)
self
.
assertIn
(
f
' exit-address-family'
,
frrconfig
)
def
test_bgp_13_vpn
(
self
):
remote_asn
=
str
(
int
(
ASN
)
+
150
)
neighbor
=
'192.0.2.55'
vrf_name
=
'red'
label
=
'auto'
rd
=
f
'{neighbor}:{ASN}'
rt_export
=
f
'{neighbor}:1002 1.2.3.4:567'
rt_import
=
f
'{neighbor}:1003 500:100'
self
.
cli_set
(
base_path
+
[
'local-as'
,
ASN
])
# testing only one AFI is sufficient as it's generic code
for
afi
in
[
'ipv4-unicast'
,
'ipv6-unicast'
]:
self
.
cli_set
(
base_path
+
[
'address-family'
,
afi
,
'export'
,
'vpn'
])
self
.
cli_set
(
base_path
+
[
'address-family'
,
afi
,
'import'
,
'vpn'
])
self
.
cli_set
(
base_path
+
[
'address-family'
,
afi
,
'label'
,
'vpn'
,
'export'
,
label
])
self
.
cli_set
(
base_path
+
[
'address-family'
,
afi
,
'rd'
,
'vpn'
,
'export'
,
rd
])
self
.
cli_set
(
base_path
+
[
'address-family'
,
afi
,
'route-map'
,
'vpn'
,
'export'
,
route_map_out
])
self
.
cli_set
(
base_path
+
[
'address-family'
,
afi
,
'route-map'
,
'vpn'
,
'import'
,
route_map_in
])
self
.
cli_set
(
base_path
+
[
'address-family'
,
afi
,
'route-target'
,
'vpn'
,
'export'
,
rt_export
])
self
.
cli_set
(
base_path
+
[
'address-family'
,
afi
,
'route-target'
,
'vpn'
,
'import'
,
rt_import
])
# commit changes
self
.
cli_commit
()
# Verify FRR bgpd configuration
frrconfig
=
self
.
getFRRconfig
(
f
'router bgp {ASN}'
)
self
.
assertIn
(
f
'router bgp {ASN}'
,
frrconfig
)
for
afi
in
[
'ipv4'
,
'ipv6'
]:
afi_config
=
self
.
getFRRconfig
(
f
' address-family {afi} unicast'
,
endsection
=
'exit-address-family'
,
daemon
=
'bgpd'
)
self
.
assertIn
(
f
'address-family {afi} unicast'
,
afi_config
)
self
.
assertIn
(
f
' export vpn'
,
afi_config
)
self
.
assertIn
(
f
' import vpn'
,
afi_config
)
self
.
assertIn
(
f
' label vpn export {label}'
,
afi_config
)
self
.
assertIn
(
f
' rd vpn export {rd}'
,
afi_config
)
self
.
assertIn
(
f
' route-map vpn export {route_map_out}'
,
afi_config
)
self
.
assertIn
(
f
' route-map vpn import {route_map_in}'
,
afi_config
)
self
.
assertIn
(
f
' rt vpn export {rt_export}'
,
afi_config
)
self
.
assertIn
(
f
' rt vpn import {rt_import}'
,
afi_config
)
self
.
assertIn
(
f
' exit-address-family'
,
afi_config
)
if
__name__
==
'__main__'
:
unittest
.
main
(
verbosity
=
2
)
File Metadata
Details
Attached
Mime Type
text/x-script.python
Expires
Tue, Dec 16, 4:00 AM (21 h, 22 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3103271
Default Alt Text
test_protocols_bgp.py (35 KB)
Attached To
Mode
rVYOSONEX vyos-1x
Attached
Detach File
Event Timeline
Log In to Comment