Page MenuHomeVyOS Platform

base_accel_ppp_test.py
No OneTemporary

Size
8 KB
Referenced Files
None
Subscribers
None

base_accel_ppp_test.py

# Copyright (C) 2020-2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
import unittest
from base_vyostest_shim import VyOSUnitTestSHIM
from configparser import ConfigParser
from vyos.configsession import ConfigSession
from vyos.configsession import ConfigSessionError
from vyos.template import is_ipv4
from vyos.utils.system import get_half_cpus
from vyos.utils.process import process_named_running
from vyos.utils.process import cmd
class BasicAccelPPPTest:
class TestCase(VyOSUnitTestSHIM.TestCase):
@classmethod
def setUpClass(cls):
cls._process_name = 'accel-pppd'
super(BasicAccelPPPTest.TestCase, cls).setUpClass()
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
cls.cli_delete(cls, cls._base_path)
def setUp(self):
self._gateway = '192.0.2.1'
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
self.cli_delete(self._base_path)
def tearDown(self):
# Check for running process
self.assertTrue(process_named_running(self._process_name))
self.cli_delete(self._base_path)
self.cli_commit()
# Check for running process
self.assertFalse(process_named_running(self._process_name))
def set(self, path):
self.cli_set(self._base_path + path)
def delete(self, path):
self.cli_delete(self._base_path + path)
def basic_config(self):
# PPPoE local auth mode requires local users to be configured!
self.set(['authentication', 'local-users', 'username', 'vyos', 'password', 'vyos'])
self.set(['authentication', 'mode', 'local'])
self.set(['gateway-address', self._gateway])
def verify(self, conf):
self.assertEqual(conf['core']['thread-count'], str(get_half_cpus()))
def test_accel_name_servers(self):
# Verify proper Name-Server configuration for IPv4 and IPv6
self.basic_config()
nameserver = ['192.0.2.1', '192.0.2.2', '2001:db8::1']
for ns in nameserver:
self.set(['name-server', ns])
# commit changes
self.cli_commit()
# Validate configuration values
conf = ConfigParser(allow_no_value=True, delimiters='=')
conf.read(self._config_file)
# IPv4 and IPv6 nameservers must be checked individually
for ns in nameserver:
if is_ipv4(ns):
self.assertIn(ns, [conf['dns']['dns1'], conf['dns']['dns2']])
else:
self.assertEqual(conf['ipv6-dns'][ns], None)
def test_accel_local_authentication(self):
# Test configuration of local authentication
self.basic_config()
# upload / download limit
user = 'test'
password = 'test2'
static_ip = '100.100.100.101'
upload = '5000'
download = '10000'
self.set(['authentication', 'local-users', 'username', user, 'password', password])
self.set(['authentication', 'local-users', 'username', user, 'static-ip', static_ip])
self.set(['authentication', 'local-users', 'username', user, 'rate-limit', 'upload', upload])
# upload rate-limit requires also download rate-limit
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.set(['authentication', 'local-users', 'username', user, 'rate-limit', 'download', download])
# commit changes
self.cli_commit()
# Validate configuration values
conf = ConfigParser(allow_no_value=True, delimiters='=')
conf.read(self._config_file)
# check proper path to chap-secrets file
self.assertEqual(conf['chap-secrets']['chap-secrets'], self._chap_secrets)
# basic verification
self.verify(conf)
# check local users
tmp = cmd(f'sudo cat {self._chap_secrets}')
regex = f'{user}\s+\*\s+{password}\s+{static_ip}\s+{download}/{upload}'
tmp = re.findall(regex, tmp)
self.assertTrue(tmp)
# Check local-users default value(s)
self.delete(['authentication', 'local-users', 'username', user, 'static-ip'])
# commit changes
self.cli_commit()
# check local users
tmp = cmd(f'sudo cat {self._chap_secrets}')
regex = f'{user}\s+\*\s+{password}\s+\*\s+{download}/{upload}'
tmp = re.findall(regex, tmp)
self.assertTrue(tmp)
def test_accel_radius_authentication(self):
# Test configuration of RADIUS authentication for PPPoE server
self.basic_config()
radius_server = '192.0.2.22'
radius_key = 'secretVyOS'
radius_port = '2000'
radius_port_acc = '3000'
self.set(['authentication', 'mode', 'radius'])
self.set(['authentication', 'radius', 'server', radius_server, 'key', radius_key])
self.set(['authentication', 'radius', 'server', radius_server, 'port', radius_port])
self.set(['authentication', 'radius', 'server', radius_server, 'acct-port', radius_port_acc])
coa_server = '4.4.4.4'
coa_key = 'testCoA'
self.set(['authentication', 'radius', 'dynamic-author', 'server', coa_server])
self.set(['authentication', 'radius', 'dynamic-author', 'key', coa_key])
nas_id = 'VyOS-PPPoE'
nas_ip = '7.7.7.7'
self.set(['authentication', 'radius', 'nas-identifier', nas_id])
self.set(['authentication', 'radius', 'nas-ip-address', nas_ip])
source_address = '1.2.3.4'
self.set(['authentication', 'radius', 'source-address', source_address])
# commit changes
self.cli_commit()
# Validate configuration values
conf = ConfigParser(allow_no_value=True, delimiters='=')
conf.read(self._config_file)
# basic verification
self.verify(conf)
# check auth
self.assertTrue(conf['radius'].getboolean('verbose'))
self.assertEqual(conf['radius']['acct-timeout'], '3')
self.assertEqual(conf['radius']['timeout'], '3')
self.assertEqual(conf['radius']['max-try'], '3')
self.assertEqual(conf['radius']['dae-server'], f'{coa_server}:1700,{coa_key}')
self.assertEqual(conf['radius']['nas-identifier'], nas_id)
self.assertEqual(conf['radius']['nas-ip-address'], nas_ip)
self.assertEqual(conf['radius']['bind'], source_address)
server = conf['radius']['server'].split(',')
self.assertEqual(radius_server, server[0])
self.assertEqual(radius_key, server[1])
self.assertEqual(f'auth-port={radius_port}', server[2])
self.assertEqual(f'acct-port={radius_port_acc}', server[3])
self.assertEqual(f'req-limit=0', server[4])
self.assertEqual(f'fail-time=0', server[5])
#
# Disable Radius Accounting
#
self.delete(['authentication', 'radius', 'server', radius_server, 'acct-port'])
self.set(['authentication', 'radius', 'server', radius_server, 'disable-accounting'])
# commit changes
self.cli_commit()
conf.read(self._config_file)
server = conf['radius']['server'].split(',')
self.assertEqual(radius_server, server[0])
self.assertEqual(radius_key, server[1])
self.assertEqual(f'auth-port={radius_port}', server[2])
self.assertEqual(f'acct-port=0', server[3])
self.assertEqual(f'req-limit=0', server[4])
self.assertEqual(f'fail-time=0', server[5])

File Metadata

Mime Type
text/x-script.python
Expires
Mon, Dec 15, 5:35 PM (1 d, 20 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3076473
Default Alt Text
base_accel_ppp_test.py (8 KB)

Event Timeline