Page MenuHomeVyOS Platform

base_accel_ppp_test.py
No OneTemporary

Size
17 KB
Referenced Files
None
Subscribers
None

base_accel_ppp_test.py

# Copyright (C) 2020-2024 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
import unittest
from base_vyostest_shim import VyOSUnitTestSHIM
from configparser import ConfigParser
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.template import is_ipv4
from vyos.utils.system import get_half_cpus
from vyos.utils.process import process_named_running
from vyos.utils.process import cmd
class BasicAccelPPPTest:
class TestCase(VyOSUnitTestSHIM.TestCase):
@classmethod
def setUpClass(cls):
cls._process_name = "accel-pppd"
super(BasicAccelPPPTest.TestCase, cls).setUpClass()
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
cls.cli_delete(cls, cls._base_path)
def setUp(self):
self._gateway = "192.0.2.1"
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
self.cli_delete(self._base_path)
def tearDown(self):
# Check for running process
self.assertTrue(process_named_running(self._process_name))
self.cli_delete(self._base_path)
self.cli_commit()
# Check for running process
self.assertFalse(process_named_running(self._process_name))
def set(self, path):
self.cli_set(self._base_path + path)
def delete(self, path):
self.cli_delete(self._base_path + path)
def basic_protocol_specific_config(self):
"""
An astract method.
Initialize protocol scpecific configureations.
"""
self.assertFalse(True, msg="Function must be defined")
def initial_auth_config(self):
"""
Initialization of default authentication for all protocols
"""
self.set(
[
"authentication",
"local-users",
"username",
"vyos",
"password",
"vyos",
]
)
self.set(["authentication", "mode", "local"])
def initial_gateway_config(self):
"""
Initialization of default gateway
"""
self.set(["gateway-address", self._gateway])
def initial_pool_config(self):
"""
Initialization of default client ip pool
"""
first_pool = "SIMPLE-POOL"
self.set(["client-ip-pool", first_pool, "range", "192.0.2.0/24"])
self.set(["default-pool", first_pool])
def basic_config(self, is_auth=True, is_gateway=True, is_client_pool=True):
"""
Initialization of basic configuration
:param is_auth: authentication initialization
:type is_auth: bool
:param is_gateway: gateway initialization
:type is_gateway: bool
:param is_client_pool: client ip pool initialization
:type is_client_pool: bool
"""
self.basic_protocol_specific_config()
if is_auth:
self.initial_auth_config()
if is_gateway:
self.initial_gateway_config()
if is_client_pool:
self.initial_pool_config()
def getConfig(self, start, end="cli"):
"""
Return part of configuration from line
where the first injection of start keyword to the line
where the first injection of end keyowrd
:param start: start keyword
:type start: str
:param end: end keyword
:type end: str
:return: part of config
:rtype: str
"""
command = f'cat {self._config_file} | sed -n "/^\[{start}/,/^\[{end}/p"'
out = cmd(command)
return out
def verify(self, conf):
self.assertEqual(conf["core"]["thread-count"], str(get_half_cpus()))
def test_accel_name_servers(self):
# Verify proper Name-Server configuration for IPv4 and IPv6
self.basic_config()
nameserver = ["192.0.2.1", "192.0.2.2", "2001:db8::1"]
for ns in nameserver:
self.set(["name-server", ns])
# commit changes
self.cli_commit()
# Validate configuration values
conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False)
conf.read(self._config_file)
# IPv4 and IPv6 nameservers must be checked individually
for ns in nameserver:
if is_ipv4(ns):
self.assertIn(ns, [conf["dns"]["dns1"], conf["dns"]["dns2"]])
else:
self.assertEqual(conf["ipv6-dns"][ns], None)
def test_accel_local_authentication(self):
# Test configuration of local authentication
self.basic_config()
# upload / download limit
user = "test"
password = "test2"
static_ip = "100.100.100.101"
upload = "5000"
download = "10000"
self.set(
[
"authentication",
"local-users",
"username",
user,
"password",
password,
]
)
self.set(
[
"authentication",
"local-users",
"username",
user,
"static-ip",
static_ip,
]
)
self.set(
[
"authentication",
"local-users",
"username",
user,
"rate-limit",
"upload",
upload,
]
)
# upload rate-limit requires also download rate-limit
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.set(
[
"authentication",
"local-users",
"username",
user,
"rate-limit",
"download",
download,
]
)
# commit changes
self.cli_commit()
# Validate configuration values
conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False)
conf.read(self._config_file)
# check proper path to chap-secrets file
self.assertEqual(conf["chap-secrets"]["chap-secrets"], self._chap_secrets)
# basic verification
self.verify(conf)
# check local users
tmp = cmd(f"sudo cat {self._chap_secrets}")
regex = f"{user}\s+\*\s+{password}\s+{static_ip}\s+{download}/{upload}"
tmp = re.findall(regex, tmp)
self.assertTrue(tmp)
# Check local-users default value(s)
self.delete(
["authentication", "local-users", "username", user, "static-ip"]
)
# commit changes
self.cli_commit()
# check local users
tmp = cmd(f"sudo cat {self._chap_secrets}")
regex = f"{user}\s+\*\s+{password}\s+\*\s+{download}/{upload}"
tmp = re.findall(regex, tmp)
self.assertTrue(tmp)
def test_accel_radius_authentication(self):
# Test configuration of RADIUS authentication for PPPoE server
self.basic_config()
radius_server = "192.0.2.22"
radius_key = "secretVyOS"
radius_port = "2000"
radius_port_acc = "3000"
acct_interim_jitter = '10'
acct_interim_interval = '10'
acct_timeout = '30'
self.set(["authentication", "mode", "radius"])
self.set(
["authentication", "radius", "server", radius_server, "key", radius_key]
)
self.set(
[
"authentication",
"radius",
"server",
radius_server,
"port",
radius_port,
]
)
self.set(
[
"authentication",
"radius",
"server",
radius_server,
"acct-port",
radius_port_acc,
]
)
self.set(
[
"authentication",
"radius",
"acct-interim-jitter",
acct_interim_jitter,
]
)
self.set(
[
"authentication",
"radius",
"accounting-interim-interval",
acct_interim_interval,
]
)
self.set(
[
"authentication",
"radius",
"acct-timeout",
acct_timeout,
]
)
coa_server = "4.4.4.4"
coa_key = "testCoA"
self.set(
["authentication", "radius", "dynamic-author", "server", coa_server]
)
self.set(["authentication", "radius", "dynamic-author", "key", coa_key])
nas_id = "VyOS-PPPoE"
nas_ip = "7.7.7.7"
self.set(["authentication", "radius", "nas-identifier", nas_id])
self.set(["authentication", "radius", "nas-ip-address", nas_ip])
source_address = "1.2.3.4"
self.set(["authentication", "radius", "source-address", source_address])
# commit changes
self.cli_commit()
# Validate configuration values
conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False)
conf.read(self._config_file)
# basic verification
self.verify(conf)
# check auth
self.assertTrue(conf["radius"].getboolean("verbose"))
self.assertEqual(conf["radius"]["acct-timeout"], acct_timeout)
self.assertEqual(conf["radius"]["acct-interim-interval"], acct_interim_interval)
self.assertEqual(conf["radius"]["acct-interim-jitter"], acct_interim_jitter)
self.assertEqual(conf["radius"]["timeout"], "3")
self.assertEqual(conf["radius"]["max-try"], "3")
self.assertEqual(
conf["radius"]["dae-server"], f"{coa_server}:1700,{coa_key}"
)
self.assertEqual(conf["radius"]["nas-identifier"], nas_id)
self.assertEqual(conf["radius"]["nas-ip-address"], nas_ip)
self.assertEqual(conf["radius"]["bind"], source_address)
server = conf["radius"]["server"].split(",")
self.assertEqual(radius_server, server[0])
self.assertEqual(radius_key, server[1])
self.assertEqual(f"auth-port={radius_port}", server[2])
self.assertEqual(f"acct-port={radius_port_acc}", server[3])
self.assertEqual(f"req-limit=0", server[4])
self.assertEqual(f"fail-time=0", server[5])
#
# Disable Radius Accounting
#
self.delete(
["authentication", "radius", "server", radius_server, "acct-port"]
)
self.set(
[
"authentication",
"radius",
"server",
radius_server,
"disable-accounting",
]
)
# commit changes
self.cli_commit()
conf.read(self._config_file)
server = conf["radius"]["server"].split(",")
self.assertEqual(radius_server, server[0])
self.assertEqual(radius_key, server[1])
self.assertEqual(f"auth-port={radius_port}", server[2])
self.assertEqual(f"acct-port=0", server[3])
self.assertEqual(f"req-limit=0", server[4])
self.assertEqual(f"fail-time=0", server[5])
def test_accel_ipv4_pool(self):
self.basic_config(is_gateway=False, is_client_pool=False)
gateway = "192.0.2.1"
subnet = "172.16.0.0/24"
first_pool = "POOL1"
second_pool = "POOL2"
range = "192.0.2.10-192.0.2.20"
range_config = "192.0.2.10-20"
self.set(["gateway-address", gateway])
self.set(["client-ip-pool", first_pool, "range", subnet])
self.set(["client-ip-pool", first_pool, "next-pool", second_pool])
self.set(["client-ip-pool", second_pool, "range", range])
self.set(["default-pool", first_pool])
# commit changes
self.cli_commit()
# Validate configuration values
conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False)
conf.read(self._config_file)
self.assertEqual(
f"{first_pool},next={second_pool}", conf["ip-pool"][f"{subnet},name"]
)
self.assertEqual(second_pool, conf["ip-pool"][f"{range_config},name"])
self.assertEqual(gateway, conf["ip-pool"]["gw-ip-address"])
self.assertEqual(first_pool, conf[self._protocol_section]["ip-pool"])
def test_accel_next_pool(self):
# T5099 required specific order
self.basic_config(is_gateway=False, is_client_pool=False)
gateway = "192.0.2.1"
first_pool = "VyOS-pool1"
first_subnet = "192.0.2.0/25"
second_pool = "Vyos-pool2"
second_subnet = "203.0.113.0/25"
third_pool = "Vyos-pool3"
third_subnet = "198.51.100.0/24"
self.set(["gateway-address", gateway])
self.set(["client-ip-pool", first_pool, "range", first_subnet])
self.set(["client-ip-pool", first_pool, "next-pool", second_pool])
self.set(["client-ip-pool", second_pool, "range", second_subnet])
self.set(["client-ip-pool", second_pool, "next-pool", third_pool])
self.set(["client-ip-pool", third_pool, "range", third_subnet])
# commit changes
self.cli_commit()
config = self.getConfig("ip-pool")
pool_config = f"""gw-ip-address={gateway}
{third_subnet},name={third_pool}
{second_subnet},name={second_pool},next={third_pool}
{first_subnet},name={first_pool},next={second_pool}"""
self.assertIn(pool_config, config)
def test_accel_ipv6_pool(self):
# Test configuration of IPv6 client pools
self.basic_config(is_gateway=False, is_client_pool=False)
# Enable IPv6
allow_ipv6 = 'allow'
self.set(['ppp-options', 'ipv6', allow_ipv6])
pool_name = 'ipv6_test_pool'
prefix_1 = '2001:db8:fffe::/56'
prefix_mask = '64'
prefix_2 = '2001:db8:ffff::/56'
client_prefix_1 = f'{prefix_1},{prefix_mask}'
client_prefix_2 = f'{prefix_2},{prefix_mask}'
self.set(
['client-ipv6-pool', pool_name, 'prefix', prefix_1, 'mask',
prefix_mask])
self.set(
['client-ipv6-pool', pool_name, 'prefix', prefix_2, 'mask',
prefix_mask])
delegate_1_prefix = '2001:db8:fff1::/56'
delegate_2_prefix = '2001:db8:fff2::/56'
delegate_mask = '64'
self.set(
['client-ipv6-pool', pool_name, 'delegate', delegate_1_prefix,
'delegation-prefix', delegate_mask])
self.set(
['client-ipv6-pool', pool_name, 'delegate', delegate_2_prefix,
'delegation-prefix', delegate_mask])
# commit changes
self.cli_commit()
# Validate configuration values
conf = ConfigParser(allow_no_value=True, delimiters='=',
strict=False)
conf.read(self._config_file)
for tmp in ['ipv6pool', 'ipv6_nd', 'ipv6_dhcp']:
self.assertEqual(conf['modules'][tmp], None)
self.assertEqual(conf['ppp']['ipv6'], allow_ipv6)
config = self.getConfig("ipv6-pool")
pool_config = f"""{client_prefix_1},name={pool_name}
{client_prefix_2},name={pool_name}
delegate={delegate_1_prefix},{delegate_mask},name={pool_name}
delegate={delegate_2_prefix},{delegate_mask},name={pool_name}"""
self.assertIn(pool_config, config)

File Metadata

Mime Type
text/x-script.python
Expires
Mon, Dec 15, 5:35 PM (1 d, 20 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3094889
Default Alt Text
base_accel_ppp_test.py (17 KB)

Event Timeline