Page Menu
Home
VyOS Platform
Search
Configure Global Search
Log In
Files
F38930335
test_protocols_pim.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
8 KB
Referenced Files
None
Subscribers
None
test_protocols_pim.py
View Options
#!/usr/bin/env python3
#
# Copyright (C) 2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
unittest
from
base_vyostest_shim
import
VyOSUnitTestSHIM
from
base_vyostest_shim
import
CSTORE_GUARD_TIME
from
vyos.configsession
import
ConfigSessionError
from
vyos.frrender
import
pim_daemon
from
vyos.ifconfig
import
Section
from
vyos.utils.process
import
process_named_running
base_path
=
[
'protocols'
,
'pim'
]
class
TestProtocolsPIM
(
VyOSUnitTestSHIM
.
TestCase
):
@classmethod
def
setUpClass
(
cls
):
# call base-classes classmethod
super
(
TestProtocolsPIM
,
cls
)
.
setUpClass
()
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
cls
.
cli_delete
(
cls
,
base_path
)
# Enable CSTORE guard time required by FRR related tests
cls
.
_commit_guard_time
=
CSTORE_GUARD_TIME
def
tearDown
(
self
):
# pimd process must be running
self
.
assertTrue
(
process_named_running
(
pim_daemon
))
self
.
cli_delete
(
base_path
)
self
.
cli_commit
()
# pimd process must be stopped by now
self
.
assertFalse
(
process_named_running
(
pim_daemon
))
def
test_01_pim_basic
(
self
):
rp
=
'127.0.0.1'
group
=
'224.0.0.0/4'
hello
=
'100'
dr_priority
=
'64'
self
.
cli_set
(
base_path
+
[
'rp'
,
'address'
,
rp
,
'group'
,
group
])
interfaces
=
Section
.
interfaces
(
'ethernet'
)
for
interface
in
interfaces
:
self
.
cli_set
(
base_path
+
[
'interface'
,
interface
,
'bfd'
])
self
.
cli_set
(
base_path
+
[
'interface'
,
interface
,
'dr-priority'
,
dr_priority
])
self
.
cli_set
(
base_path
+
[
'interface'
,
interface
,
'hello'
,
hello
])
self
.
cli_set
(
base_path
+
[
'interface'
,
interface
,
'no-bsm'
])
self
.
cli_set
(
base_path
+
[
'interface'
,
interface
,
'no-unicast-bsm'
])
self
.
cli_set
(
base_path
+
[
'interface'
,
interface
,
'passive'
])
# commit changes
self
.
cli_commit
()
# Verify FRR pimd configuration
frrconfig
=
self
.
getFRRconfig
(
'router pim'
,
endsection
=
'^exit'
)
self
.
assertIn
(
f
' rp {rp} {group}'
,
frrconfig
)
for
interface
in
interfaces
:
frrconfig
=
self
.
getFRRconfig
(
f
'interface {interface}'
,
endsection
=
'^exit'
)
self
.
assertIn
(
f
'interface {interface}'
,
frrconfig
)
self
.
assertIn
(
f
' ip pim'
,
frrconfig
)
self
.
assertIn
(
f
' ip pim bfd'
,
frrconfig
)
self
.
assertIn
(
f
' ip pim drpriority {dr_priority}'
,
frrconfig
)
self
.
assertIn
(
f
' ip pim hello {hello}'
,
frrconfig
)
self
.
assertIn
(
f
' no ip pim bsm'
,
frrconfig
)
self
.
assertIn
(
f
' no ip pim unicast-bsm'
,
frrconfig
)
self
.
assertIn
(
f
' ip pim passive'
,
frrconfig
)
self
.
cli_commit
()
def
test_02_pim_advanced
(
self
):
rp
=
'127.0.0.2'
group
=
'224.0.0.0/4'
join_prune_interval
=
'123'
rp_keep_alive_timer
=
'190'
keep_alive_timer
=
'180'
packets
=
'10'
prefix_list
=
'pim-test'
register_suppress_time
=
'300'
self
.
cli_set
(
base_path
+
[
'rp'
,
'address'
,
rp
,
'group'
,
group
])
self
.
cli_set
(
base_path
+
[
'rp'
,
'keep-alive-timer'
,
rp_keep_alive_timer
])
self
.
cli_set
(
base_path
+
[
'ecmp'
,
'rebalance'
])
self
.
cli_set
(
base_path
+
[
'join-prune-interval'
,
join_prune_interval
])
self
.
cli_set
(
base_path
+
[
'keep-alive-timer'
,
keep_alive_timer
])
self
.
cli_set
(
base_path
+
[
'packets'
,
packets
])
self
.
cli_set
(
base_path
+
[
'register-accept-list'
,
'prefix-list'
,
prefix_list
])
self
.
cli_set
(
base_path
+
[
'register-suppress-time'
,
register_suppress_time
])
self
.
cli_set
(
base_path
+
[
'no-v6-secondary'
])
self
.
cli_set
(
base_path
+
[
'spt-switchover'
,
'infinity-and-beyond'
,
'prefix-list'
,
prefix_list
])
self
.
cli_set
(
base_path
+
[
'ssm'
,
'prefix-list'
,
prefix_list
])
# check validate() - PIM require defined interfaces!
with
self
.
assertRaises
(
ConfigSessionError
):
self
.
cli_commit
()
interfaces
=
Section
.
interfaces
(
'ethernet'
)
for
interface
in
interfaces
:
self
.
cli_set
(
base_path
+
[
'interface'
,
interface
])
# commit changes
self
.
cli_commit
()
# Verify FRR pimd configuration
frrconfig
=
self
.
getFRRconfig
(
'router pim'
,
endsection
=
'^exit'
)
self
.
assertIn
(
f
' no send-v6-secondary'
,
frrconfig
)
self
.
assertIn
(
f
' rp {rp} {group}'
,
frrconfig
)
self
.
assertIn
(
f
' register-suppress-time {register_suppress_time}'
,
frrconfig
)
self
.
assertIn
(
f
' join-prune-interval {join_prune_interval}'
,
frrconfig
)
self
.
assertIn
(
f
' packets {packets}'
,
frrconfig
)
self
.
assertIn
(
f
' keep-alive-timer {keep_alive_timer}'
,
frrconfig
)
self
.
assertIn
(
f
' rp keep-alive-timer {rp_keep_alive_timer}'
,
frrconfig
)
self
.
assertIn
(
f
' ssm prefix-list {prefix_list}'
,
frrconfig
)
self
.
assertIn
(
f
' register-accept-list {prefix_list}'
,
frrconfig
)
self
.
assertIn
(
f
' spt-switchover infinity-and-beyond prefix-list {prefix_list}'
,
frrconfig
)
self
.
assertIn
(
f
' ecmp rebalance'
,
frrconfig
)
def
test_03_pim_igmp_proxy
(
self
):
igmp_proxy
=
[
'protocols'
,
'igmp-proxy'
]
rp
=
'127.0.0.1'
group
=
'224.0.0.0/4'
self
.
cli_set
(
base_path
)
self
.
cli_set
(
igmp_proxy
)
# check validate() - can not set both IGMP proxy and PIM
with
self
.
assertRaises
(
ConfigSessionError
):
self
.
cli_commit
()
self
.
cli_delete
(
igmp_proxy
)
self
.
cli_set
(
base_path
+
[
'rp'
,
'address'
,
rp
,
'group'
,
group
])
interfaces
=
Section
.
interfaces
(
'ethernet'
)
for
interface
in
interfaces
:
self
.
cli_set
(
base_path
+
[
'interface'
,
interface
,
'bfd'
])
# commit changes
self
.
cli_commit
()
def
test_04_igmp
(
self
):
watermark_warning
=
'2000'
query_interval
=
'1000'
query_max_response_time
=
'200'
version
=
'2'
igmp_join
=
{
'224.1.1.1'
:
{
'source'
:
[
'1.1.1.1'
,
'2.2.2.2'
,
'3.3.3.3'
]
},
'224.1.2.2'
:
{
'source'
:
[]
},
'224.1.3.3'
:
{},
}
self
.
cli_set
(
base_path
+
[
'igmp'
,
'watermark-warning'
,
watermark_warning
])
interfaces
=
Section
.
interfaces
(
'ethernet'
)
for
interface
in
interfaces
:
self
.
cli_set
(
base_path
+
[
'interface'
,
interface
,
'igmp'
,
'version'
,
version
])
self
.
cli_set
(
base_path
+
[
'interface'
,
interface
,
'igmp'
,
'query-interval'
,
query_interval
])
self
.
cli_set
(
base_path
+
[
'interface'
,
interface
,
'igmp'
,
'query-max-response-time'
,
query_max_response_time
])
for
join
,
join_config
in
igmp_join
.
items
():
self
.
cli_set
(
base_path
+
[
'interface'
,
interface
,
'igmp'
,
'join'
,
join
])
if
'source'
in
join_config
:
for
source
in
join_config
[
'source'
]:
self
.
cli_set
(
base_path
+
[
'interface'
,
interface
,
'igmp'
,
'join'
,
join
,
'source-address'
,
source
])
self
.
cli_commit
()
frrconfig
=
self
.
getFRRconfig
()
self
.
assertIn
(
f
'ip igmp watermark-warn {watermark_warning}'
,
frrconfig
)
for
interface
in
interfaces
:
frrconfig
=
self
.
getFRRconfig
(
f
'interface {interface}'
,
endsection
=
'^exit'
)
self
.
assertIn
(
f
'interface {interface}'
,
frrconfig
)
self
.
assertIn
(
f
' ip igmp'
,
frrconfig
)
self
.
assertIn
(
f
' ip igmp version {version}'
,
frrconfig
)
self
.
assertIn
(
f
' ip igmp query-interval {query_interval}'
,
frrconfig
)
self
.
assertIn
(
f
' ip igmp query-max-response-time {query_max_response_time}'
,
frrconfig
)
for
join
,
join_config
in
igmp_join
.
items
():
if
'source'
in
join_config
:
for
source
in
join_config
[
'source'
]:
self
.
assertIn
(
f
' ip igmp join-group {join} {source}'
,
frrconfig
)
else
:
self
.
assertIn
(
f
' ip igmp join-group {join}'
,
frrconfig
)
if
__name__
==
'__main__'
:
unittest
.
main
(
verbosity
=
2
)
File Metadata
Details
Attached
Mime Type
text/x-script.python
Expires
Tue, Dec 16, 4:00 AM (1 d, 12 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3072267
Default Alt Text
test_protocols_pim.py (8 KB)
Attached To
Mode
rVYOSONEX vyos-1x
Attached
Detach File
Event Timeline
Log In to Comment