diff --git a/data/templates/accel-ppp/config_ip_pool.j2 b/data/templates/accel-ppp/config_ip_pool.j2 index c567236a4..6ac04e1a1 100644 --- a/data/templates/accel-ppp/config_ip_pool.j2 +++ b/data/templates/accel-ppp/config_ip_pool.j2 @@ -1,22 +1,28 @@ {% if ordered_named_pools is vyos_defined %} [ip-pool] {% if gateway_address is vyos_defined %} {% if server_type == 'ipoe' %} {% for gw in gateway_address %} {% set host_address, _ = gw.split('/') %} gw-ip-address={{ host_address }} {% endfor %} {% else %} gw-ip-address={{ gateway_address }} {% endif %} {% endif %} {% for pool in ordered_named_pools %} {% for pool_name, pool_config in pool.items() %} +{% set iprange_str = pool_config.range %} +{% set iprange_list = pool_config.range.split('-') %} +{% if iprange_list | length == 2 %} +{% set last_ip_oct = iprange_list[1].split('.') %} +{% set iprange_str = iprange_list[0] + '-' + last_ip_oct[last_ip_oct | length - 1] %} +{% endif %} {% if pool_config.next_pool is vyos_defined %} -{{ pool_config.range }},name={{ pool_name }},next={{ pool_config.next_pool }} +{{ iprange_str }},name={{ pool_name }},next={{ pool_config.next_pool }} {% else %} -{{ pool_config.range }},name={{ pool_name }} +{{ iprange_str }},name={{ pool_name }} {% endif %} {% endfor %} {% endfor %} {% endif %} \ No newline at end of file diff --git a/smoketest/scripts/cli/base_accel_ppp_test.py b/smoketest/scripts/cli/base_accel_ppp_test.py index 32624719f..fad765c0d 100644 --- a/smoketest/scripts/cli/base_accel_ppp_test.py +++ b/smoketest/scripts/cli/base_accel_ppp_test.py @@ -1,419 +1,420 @@ # Copyright (C) 2020-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import re import unittest from base_vyostest_shim import VyOSUnitTestSHIM from configparser import ConfigParser from vyos.configsession import ConfigSession from vyos.configsession import ConfigSessionError from vyos.template import is_ipv4 from vyos.utils.system import get_half_cpus from vyos.utils.process import process_named_running from vyos.utils.process import cmd class BasicAccelPPPTest: class TestCase(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): cls._process_name = "accel-pppd" super(BasicAccelPPPTest.TestCase, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, cls._base_path) def setUp(self): self._gateway = "192.0.2.1" # ensure we can also run this test on a live system - so lets clean # out the current configuration :) self.cli_delete(self._base_path) def tearDown(self): # Check for running process self.assertTrue(process_named_running(self._process_name)) self.cli_delete(self._base_path) self.cli_commit() # Check for running process self.assertFalse(process_named_running(self._process_name)) def set(self, path): self.cli_set(self._base_path + path) def delete(self, path): self.cli_delete(self._base_path + path) def basic_protocol_specific_config(self): """ An astract method. Initialize protocol scpecific configureations. """ self.assertFalse(True, msg="Function must be defined") def initial_auth_config(self): """ Initialization of default authentication for all protocols """ self.set( [ "authentication", "local-users", "username", "vyos", "password", "vyos", ] ) self.set(["authentication", "mode", "local"]) def initial_gateway_config(self): """ Initialization of default gateway """ self.set(["gateway-address", self._gateway]) def initial_pool_config(self): """ Initialization of default client ip pool """ first_pool = "SIMPLE-POOL" self.set(["client-ip-pool", first_pool, "range", "192.0.2.0/24"]) self.set(["default-pool", first_pool]) def basic_config(self, is_auth=True, is_gateway=True, is_client_pool=True): """ Initialization of basic configuration :param is_auth: authentication initialization :type is_auth: bool :param is_gateway: gateway initialization :type is_gateway: bool :param is_client_pool: client ip pool initialization :type is_client_pool: bool """ self.basic_protocol_specific_config() if is_auth: self.initial_auth_config() if is_gateway: self.initial_gateway_config() if is_client_pool: self.initial_pool_config() def getConfig(self, start, end="cli"): """ Return part of configuration from line where the first injection of start keyword to the line where the first injection of end keyowrd :param start: start keyword :type start: str :param end: end keyword :type end: str :return: part of config :rtype: str """ command = f'cat {self._config_file} | sed -n "/^\[{start}/,/^\[{end}/p"' out = cmd(command) return out def verify(self, conf): self.assertEqual(conf["core"]["thread-count"], str(get_half_cpus())) def test_accel_name_servers(self): # Verify proper Name-Server configuration for IPv4 and IPv6 self.basic_config() nameserver = ["192.0.2.1", "192.0.2.2", "2001:db8::1"] for ns in nameserver: self.set(["name-server", ns]) # commit changes self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) conf.read(self._config_file) # IPv4 and IPv6 nameservers must be checked individually for ns in nameserver: if is_ipv4(ns): self.assertIn(ns, [conf["dns"]["dns1"], conf["dns"]["dns2"]]) else: self.assertEqual(conf["ipv6-dns"][ns], None) def test_accel_local_authentication(self): # Test configuration of local authentication self.basic_config() # upload / download limit user = "test" password = "test2" static_ip = "100.100.100.101" upload = "5000" download = "10000" self.set( [ "authentication", "local-users", "username", user, "password", password, ] ) self.set( [ "authentication", "local-users", "username", user, "static-ip", static_ip, ] ) self.set( [ "authentication", "local-users", "username", user, "rate-limit", "upload", upload, ] ) # upload rate-limit requires also download rate-limit with self.assertRaises(ConfigSessionError): self.cli_commit() self.set( [ "authentication", "local-users", "username", user, "rate-limit", "download", download, ] ) # commit changes self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) conf.read(self._config_file) # check proper path to chap-secrets file self.assertEqual(conf["chap-secrets"]["chap-secrets"], self._chap_secrets) # basic verification self.verify(conf) # check local users tmp = cmd(f"sudo cat {self._chap_secrets}") regex = f"{user}\s+\*\s+{password}\s+{static_ip}\s+{download}/{upload}" tmp = re.findall(regex, tmp) self.assertTrue(tmp) # Check local-users default value(s) self.delete( ["authentication", "local-users", "username", user, "static-ip"] ) # commit changes self.cli_commit() # check local users tmp = cmd(f"sudo cat {self._chap_secrets}") regex = f"{user}\s+\*\s+{password}\s+\*\s+{download}/{upload}" tmp = re.findall(regex, tmp) self.assertTrue(tmp) def test_accel_radius_authentication(self): # Test configuration of RADIUS authentication for PPPoE server self.basic_config() radius_server = "192.0.2.22" radius_key = "secretVyOS" radius_port = "2000" radius_port_acc = "3000" self.set(["authentication", "mode", "radius"]) self.set( ["authentication", "radius", "server", radius_server, "key", radius_key] ) self.set( [ "authentication", "radius", "server", radius_server, "port", radius_port, ] ) self.set( [ "authentication", "radius", "server", radius_server, "acct-port", radius_port_acc, ] ) coa_server = "4.4.4.4" coa_key = "testCoA" self.set( ["authentication", "radius", "dynamic-author", "server", coa_server] ) self.set(["authentication", "radius", "dynamic-author", "key", coa_key]) nas_id = "VyOS-PPPoE" nas_ip = "7.7.7.7" self.set(["authentication", "radius", "nas-identifier", nas_id]) self.set(["authentication", "radius", "nas-ip-address", nas_ip]) source_address = "1.2.3.4" self.set(["authentication", "radius", "source-address", source_address]) # commit changes self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) conf.read(self._config_file) # basic verification self.verify(conf) # check auth self.assertTrue(conf["radius"].getboolean("verbose")) self.assertEqual(conf["radius"]["acct-timeout"], "3") self.assertEqual(conf["radius"]["timeout"], "3") self.assertEqual(conf["radius"]["max-try"], "3") self.assertEqual( conf["radius"]["dae-server"], f"{coa_server}:1700,{coa_key}" ) self.assertEqual(conf["radius"]["nas-identifier"], nas_id) self.assertEqual(conf["radius"]["nas-ip-address"], nas_ip) self.assertEqual(conf["radius"]["bind"], source_address) server = conf["radius"]["server"].split(",") self.assertEqual(radius_server, server[0]) self.assertEqual(radius_key, server[1]) self.assertEqual(f"auth-port={radius_port}", server[2]) self.assertEqual(f"acct-port={radius_port_acc}", server[3]) self.assertEqual(f"req-limit=0", server[4]) self.assertEqual(f"fail-time=0", server[5]) # # Disable Radius Accounting # self.delete( ["authentication", "radius", "server", radius_server, "acct-port"] ) self.set( [ "authentication", "radius", "server", radius_server, "disable-accounting", ] ) # commit changes self.cli_commit() conf.read(self._config_file) server = conf["radius"]["server"].split(",") self.assertEqual(radius_server, server[0]) self.assertEqual(radius_key, server[1]) self.assertEqual(f"auth-port={radius_port}", server[2]) self.assertEqual(f"acct-port=0", server[3]) self.assertEqual(f"req-limit=0", server[4]) self.assertEqual(f"fail-time=0", server[5]) def test_accel_ipv4_pool(self): """ Test accel-ppp IPv4 pool """ self.basic_config(is_gateway=False, is_client_pool=False) gateway = "192.0.2.1" subnet = "172.16.0.0/24" first_pool = "POOL1" second_pool = "POOL2" range = "192.0.2.10-192.0.2.20" + range_config = "192.0.2.10-20" self.set(["gateway-address", gateway]) self.set(["client-ip-pool", first_pool, "range", subnet]) self.set(["client-ip-pool", first_pool, "next-pool", second_pool]) self.set(["client-ip-pool", second_pool, "range", range]) self.set(["default-pool", first_pool]) # commit changes self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) conf.read(self._config_file) self.assertEqual( f"{first_pool},next={second_pool}", conf["ip-pool"][f"{subnet},name"] ) - self.assertEqual(second_pool, conf["ip-pool"][f"{range},name"]) + self.assertEqual(second_pool, conf["ip-pool"][f"{range_config},name"]) self.assertEqual(gateway, conf["ip-pool"]["gw-ip-address"]) self.assertEqual(first_pool, conf[self._protocol_section]["ip-pool"]) def test_accel_next_pool(self): """ T5099 required specific order """ self.basic_config(is_gateway=False, is_client_pool=False) gateway = "192.0.2.1" first_pool = "VyOS-pool1" first_subnet = "192.0.2.0/25" second_pool = "Vyos-pool2" second_subnet = "203.0.113.0/25" third_pool = "Vyos-pool3" third_subnet = "198.51.100.0/24" self.set(["gateway-address", gateway]) self.set(["client-ip-pool", first_pool, "range", first_subnet]) self.set(["client-ip-pool", first_pool, "next-pool", second_pool]) self.set(["client-ip-pool", second_pool, "range", second_subnet]) self.set(["client-ip-pool", second_pool, "next-pool", third_pool]) self.set(["client-ip-pool", third_pool, "range", third_subnet]) # commit changes self.cli_commit() config = self.getConfig("ip-pool") pool_config = f"""gw-ip-address={gateway} {third_subnet},name={third_pool} {second_subnet},name={second_pool},next={third_pool} {first_subnet},name={first_pool},next={second_pool}""" self.assertIn(pool_config, config) diff --git a/smoketest/scripts/cli/test_service_ipoe-server.py b/smoketest/scripts/cli/test_service_ipoe-server.py index 358668e0d..6e95b3bd1 100755 --- a/smoketest/scripts/cli/test_service_ipoe-server.py +++ b/smoketest/scripts/cli/test_service_ipoe-server.py @@ -1,192 +1,193 @@ #!/usr/bin/env python3 # # Copyright (C) 2022-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import re import unittest from collections import OrderedDict from base_accel_ppp_test import BasicAccelPPPTest from vyos.configsession import ConfigSessionError from vyos.utils.process import cmd from configparser import ConfigParser from configparser import RawConfigParser ac_name = "ACN" interface = "eth0" class MultiOrderedDict(OrderedDict): # Accel-ppp has duplicate keys in config file (gw-ip-address) # This class is used to define dictionary which can contain multiple values # in one key. def __setitem__(self, key, value): if isinstance(value, list) and key in self: self[key].extend(value) else: super(OrderedDict, self).__setitem__(key, value) class TestServiceIPoEServer(BasicAccelPPPTest.TestCase): @classmethod def setUpClass(cls): cls._base_path = ["service", "ipoe-server"] cls._config_file = "/run/accel-pppd/ipoe.conf" cls._chap_secrets = "/run/accel-pppd/ipoe.chap-secrets" cls._protocol_section = "ipoe" # call base-classes classmethod super(TestServiceIPoEServer, cls).setUpClass() def verify(self, conf): super().verify(conf) # Validate configuration values accel_modules = list(conf["modules"].keys()) self.assertIn("log_syslog", accel_modules) self.assertIn("ipoe", accel_modules) self.assertIn("shaper", accel_modules) self.assertIn("ipv6pool", accel_modules) self.assertIn("ipv6_nd", accel_modules) self.assertIn("ipv6_dhcp", accel_modules) self.assertIn("ippool", accel_modules) def initial_gateway_config(self): self._gateway = "192.0.2.1/24" super().initial_gateway_config() def initial_auth_config(self): self.set(["authentication", "mode", "noauth"]) def basic_protocol_specific_config(self): self.set(["interface", interface, "client-subnet", "192.168.0.0/24"]) def test_accel_local_authentication(self): mac_address = "08:00:27:2f:d8:06" self.set(["authentication", "interface", interface, "mac", mac_address]) self.set(["authentication", "mode", "local"]) # No IPoE interface configured with self.assertRaises(ConfigSessionError): self.cli_commit() # Test configuration of local authentication for PPPoE server self.basic_config() # Rewrite authentication from basic_config self.set(["authentication", "interface", interface, "mac", mac_address]) self.set(["authentication", "mode", "local"]) # commit changes self.cli_commit() # Validate configuration values conf = ConfigParser(allow_no_value=True, delimiters="=", strict=False) conf.read(self._config_file) # check proper path to chap-secrets file self.assertEqual(conf["chap-secrets"]["chap-secrets"], self._chap_secrets) accel_modules = list(conf["modules"].keys()) self.assertIn("chap-secrets", accel_modules) # basic verification self.verify(conf) # check local users tmp = cmd(f"sudo cat {self._chap_secrets}") regex = f"{interface}\s+\*\s+{mac_address}\s+\*" tmp = re.findall(regex, tmp) self.assertTrue(tmp) def test_accel_ipv4_pool(self): self.basic_config(is_gateway=False, is_client_pool=False) gateway = ["172.16.0.1/25", "192.0.2.1/24"] subnet = "172.16.0.0/24" first_pool = "POOL1" second_pool = "POOL2" range = "192.0.2.10-192.0.2.20" + range_config = "192.0.2.10-20" for gw in gateway: self.set(["gateway-address", gw]) self.set(["client-ip-pool", first_pool, "range", subnet]) self.set(["client-ip-pool", first_pool, "next-pool", second_pool]) self.set(["client-ip-pool", second_pool, "range", range]) self.set(["default-pool", first_pool]) # commit changes self.cli_commit() # Validate configuration values conf = RawConfigParser( allow_no_value=True, delimiters="=", strict=False, dict_type=MultiOrderedDict, ) conf.read(self._config_file) self.assertIn( f"{first_pool},next={second_pool}", conf["ip-pool"][f"{subnet},name"] ) - self.assertIn(second_pool, conf["ip-pool"][f"{range},name"]) + self.assertIn(second_pool, conf["ip-pool"][f"{range_config},name"]) gw_pool_config_list = conf.get("ip-pool", "gw-ip-address") gw_ipoe_config_list = conf.get(self._protocol_section, "gw-ip-address") for gw in gateway: self.assertIn(gw.split("/")[0], gw_pool_config_list) self.assertIn(gw, gw_ipoe_config_list) self.assertIn(first_pool, conf[self._protocol_section]["ip-pool"]) def test_accel_next_pool(self): self.basic_config(is_gateway=False, is_client_pool=False) first_pool = "VyOS-pool1" first_subnet = "192.0.2.0/25" first_gateway = "192.0.2.1/24" second_pool = "Vyos-pool2" second_subnet = "203.0.113.0/25" second_gateway = "203.0.113.1/24" third_pool = "Vyos-pool3" third_subnet = "198.51.100.0/24" third_gateway = "198.51.100.1/24" self.set(["gateway-address", f"{first_gateway}"]) self.set(["gateway-address", f"{second_gateway}"]) self.set(["gateway-address", f"{third_gateway}"]) self.set(["client-ip-pool", first_pool, "range", first_subnet]) self.set(["client-ip-pool", first_pool, "next-pool", second_pool]) self.set(["client-ip-pool", second_pool, "range", second_subnet]) self.set(["client-ip-pool", second_pool, "next-pool", third_pool]) self.set(["client-ip-pool", third_pool, "range", third_subnet]) # commit changes self.cli_commit() config = self.getConfig("ip-pool") # T5099 required specific order pool_config = f"""gw-ip-address={first_gateway.split('/')[0]} gw-ip-address={second_gateway.split('/')[0]} gw-ip-address={third_gateway.split('/')[0]} {third_subnet},name={third_pool} {second_subnet},name={second_pool},next={third_pool} {first_subnet},name={first_pool},next={second_pool}""" self.assertIn(pool_config, config) if __name__ == "__main__": unittest.main(verbosity=2)