diff --git a/interface-definitions/system_login.xml.in b/interface-definitions/system_login.xml.in index 672c4afc8..a59f54005 100644 --- a/interface-definitions/system_login.xml.in +++ b/interface-definitions/system_login.xml.in @@ -1,302 +1,303 @@ <?xml version="1.0"?> <interfaceDefinition> <node name="system"> <children> <node name="login" owner="${vyos_conf_scripts_dir}/system_login.py"> <properties> <help>System User Login Configuration</help> <priority>400</priority> </properties> <children> <tagNode name="user"> <properties> <help>Local user account information</help> <constraint> #include <include/constraint/login-username.xml.i> </constraint> <constraintErrorMessage>Username contains illegal characters or\nexceeds 100 character limitation.</constraintErrorMessage> </properties> <children> <node name="authentication"> <properties> <help>Authentication settings</help> </properties> <children> <leafNode name="encrypted-password"> <properties> <help>Encrypted password</help> <constraint> <regex>(\*|\!)</regex> <regex>[a-zA-Z0-9\.\/]{13}</regex> <regex>\$1\$[a-zA-Z0-9\./]*\$[a-zA-Z0-9\./]{22}</regex> <regex>\$5\$(rounds=[0-9]+\$)?[a-zA-Z0-9\./]*\$[a-zA-Z0-9\./]{43}</regex> <regex>\$6\$(rounds=[0-9]+\$)?[a-zA-Z0-9\./]*\$[a-zA-Z0-9\./]{86}</regex> </constraint> <constraintErrorMessage>Invalid encrypted password for $VAR(../../@).</constraintErrorMessage> </properties> <defaultValue>!</defaultValue> </leafNode> <node name="otp"> <properties> <help>One-Time-Pad (two-factor) authentication parameters</help> </properties> <children> <leafNode name="rate-limit"> <properties> <help>Limit number of logins (rate-limit) per rate-time</help> <valueHelp> <format>u32:1-10</format> <description>Number of attempts</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 1-10"/> </constraint> <constraintErrorMessage>Number of login attempts must me between 1 and 10</constraintErrorMessage> </properties> <defaultValue>3</defaultValue> </leafNode> <leafNode name="rate-time"> <properties> <help>Limit number of logins (rate-limit) per rate-time</help> <valueHelp> <format>u32:15-600</format> <description>Time interval</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 15-600"/> </constraint> <constraintErrorMessage>Rate limit time interval must be between 15 and 600 seconds</constraintErrorMessage> </properties> <defaultValue>30</defaultValue> </leafNode> <leafNode name="window-size"> <properties> <help>Set window of concurrently valid codes</help> <valueHelp> <format>u32:1-21</format> <description>Window size</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 1-21"/> </constraint> <constraintErrorMessage>Window of concurrently valid codes must be between 1 and 21</constraintErrorMessage> </properties> <defaultValue>3</defaultValue> </leafNode> <leafNode name="key"> <properties> <help>Key/secret the token algorithm (see RFC4226)</help> <valueHelp> <format>txt</format> <description>Base32 encoded key/token</description> </valueHelp> <constraint> <regex>[a-zA-Z2-7]{26,10000}</regex> </constraint> <constraintErrorMessage>Key must only include base32 characters and be at least 26 characters long</constraintErrorMessage> </properties> </leafNode> </children> </node> <leafNode name="plaintext-password"> <properties> <help>Plaintext password used for encryption</help> </properties> </leafNode> <tagNode name="public-keys"> <properties> <help>Remote access public keys</help> <valueHelp> <format>txt</format> <description>Key identifier used by ssh-keygen (usually of form user@host)</description> </valueHelp> </properties> <children> <leafNode name="key"> <properties> <help>Public key value (Base64 encoded)</help> <constraint> <validator name="base64"/> </constraint> </properties> </leafNode> <leafNode name="options"> <properties> <help>Optional public key options</help> </properties> </leafNode> <leafNode name="type"> <properties> <help>SSH public key type</help> <completionHelp> <list>ssh-dss ssh-rsa ecdsa-sha2-nistp256 ecdsa-sha2-nistp384 ecdsa-sha2-nistp521 ssh-ed25519 sk-ecdsa-sha2-nistp256@openssh.com sk-ssh-ed25519@openssh.com</list> </completionHelp> <valueHelp> <format>ssh-dss</format> <description>Digital Signature Algorithm (DSA) key support</description> </valueHelp> <valueHelp> <format>ssh-rsa</format> <description>Key pair based on RSA algorithm</description> </valueHelp> <valueHelp> <format>ecdsa-sha2-nistp256</format> <description>Elliptic Curve DSA with NIST P-256 curve</description> </valueHelp> <valueHelp> <format>ecdsa-sha2-nistp384</format> <description>Elliptic Curve DSA with NIST P-384 curve</description> </valueHelp> <valueHelp> <format>ecdsa-sha2-nistp521</format> <description>Elliptic Curve DSA with NIST P-521 curve</description> </valueHelp> <valueHelp> <format>ssh-ed25519</format> <description>Edwards-curve DSA with elliptic curve 25519</description> </valueHelp> <valueHelp> <format>sk-ecdsa-sha2-nistp256@openssh.com</format> <description>Elliptic Curve DSA security key</description> </valueHelp> <valueHelp> <format>sk-ssh-ed25519@openssh.com</format> <description>Elliptic curve 25519 security key</description> </valueHelp> <constraint> <regex>(ssh-dss|ssh-rsa|ecdsa-sha2-nistp256|ecdsa-sha2-nistp384|ecdsa-sha2-nistp521|ssh-ed25519|sk-ecdsa-sha2-nistp256@openssh.com|sk-ssh-ed25519@openssh.com)</regex> </constraint> </properties> </leafNode> </children> </tagNode> </children> </node> + #include <include/generic-disable-node.xml.i> <leafNode name="full-name"> <properties> <help>Full name of the user (use quotes for names with spaces)</help> <constraint> <regex>[^:]*</regex> </constraint> <constraintErrorMessage>Cannot use ':' in full name</constraintErrorMessage> </properties> </leafNode> <leafNode name="home-directory"> <properties> <help>Home directory</help> <valueHelp> <format>txt</format> <description>Path to home directory</description> </valueHelp> <constraint> <regex>\/$|(\/[a-zA-Z_0-9-.]+)+</regex> </constraint> </properties> </leafNode> </children> </tagNode> #include <include/radius-server-ipv4-ipv6.xml.i> <node name="radius"> <children> <tagNode name="server"> <children> #include <include/radius-timeout.xml.i> <leafNode name="priority"> <properties> <help>Server priority</help> <valueHelp> <format>u32:1-255</format> <description>Server priority</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 1-255"/> </constraint> </properties> <defaultValue>255</defaultValue> </leafNode> </children> </tagNode> #include <include/interface/vrf.xml.i> </children> </node> <node name="tacacs"> <properties> <help>TACACS+ based user authentication</help> </properties> <children> <tagNode name="server"> <properties> <help>TACACS+ server configuration</help> <valueHelp> <format>ipv4</format> <description>TACACS+ server IPv4 address</description> </valueHelp> <constraint> <validator name="ipv4-address"/> </constraint> </properties> <children> #include <include/generic-disable-node.xml.i> #include <include/radius-server-key.xml.i> #include <include/port-number.xml.i> <leafNode name="port"> <defaultValue>49</defaultValue> </leafNode> </children> </tagNode> <leafNode name="security-mode"> <properties> <help>Security mode for TACACS+ authentication</help> <completionHelp> <list>mandatory optional</list> </completionHelp> <valueHelp> <format>mandatory</format> <description>Deny access immediately if TACACS+ answers with REJECT</description> </valueHelp> <valueHelp> <format>optional</format> <description>Pass to the next authentication method if TACACS+ answers with REJECT</description> </valueHelp> <constraint> <regex>(mandatory|optional)</regex> </constraint> </properties> <defaultValue>optional</defaultValue> </leafNode> #include <include/source-address-ipv4.xml.i> #include <include/radius-timeout.xml.i> #include <include/interface/vrf.xml.i> </children> </node> <leafNode name="max-login-session"> <properties> <help>Maximum number of all login sessions</help> <valueHelp> <format>u32:1-65536</format> <description>Maximum number of all login sessions</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 1-65536"/> </constraint> <constraintErrorMessage>Maximum logins must be between 1 and 65536</constraintErrorMessage> </properties> </leafNode> <leafNode name="timeout"> <properties> <help>Session timeout</help> <valueHelp> <format>u32:5-604800</format> <description>Session timeout in seconds</description> </valueHelp> <constraint> <validator name="numeric" argument="--range 5-604800"/> </constraint> <constraintErrorMessage>Timeout must be between 5 and 604800 seconds</constraintErrorMessage> </properties> </leafNode> </children> </node> </children> </node> </interfaceDefinition> diff --git a/smoketest/scripts/cli/test_system_login.py b/smoketest/scripts/cli/test_system_login.py index 195b127a4..d93ad952f 100755 --- a/smoketest/scripts/cli/test_system_login.py +++ b/smoketest/scripts/cli/test_system_login.py @@ -1,325 +1,340 @@ #!/usr/bin/env python3 # -# Copyright (C) 2019-2023 VyOS maintainers and contributors +# Copyright (C) 2019-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import re -import platform import unittest import paramiko from base_vyostest_shim import VyOSUnitTestSHIM +from gzip import GzipFile from subprocess import Popen, PIPE from pwd import getpwall from time import sleep from vyos.configsession import ConfigSessionError from vyos.utils.process import cmd from vyos.utils.file import read_file from vyos.template import inc_ip base_path = ['system', 'login'] users = ['vyos1', 'vyos-roxx123', 'VyOS-123_super.Nice'] ssh_pubkey = """ AAAAB3NzaC1yc2EAAAADAQABAAABgQD0NuhUOEtMIKnUVFIHoFatqX/c4mjerXyF TlXYfVt6Ls2NZZsUSwHbnhK4BKDrPvVZMW/LycjQPzWW6TGtk6UbZP1WqdviQ9hP jsEeKJSTKciMSvQpjBWyEQQPXSKYQC7ryQQilZDqnJgzqwzejKEe+nhhOdBvjuZc uukxjT69E0UmWAwLxzvfiurwiQaC7tG+PwqvtfHOPL3i6yRO2C5ORpFarx8PeGDS IfIXJCr3LoUbLHeuE7T2KaOKQcX0UsWJ4CoCapRLpTVYPDB32BYfgq7cW1Sal1re EGH2PzuXBklinTBgCHA87lHjpwDIAqdmvMj7SXIW9LxazLtP+e37sexE7xEs0cpN l68txdDbY2P2Kbz5mqGFfCvBYKv9V2clM5vyWNy/Xp5TsCis89nn83KJmgFS7sMx pHJz8umqkxy3hfw0K7BRFtjWd63sbOP8Q/SDV7LPaIfIxenA9zv2rY7y+AIqTmSr TTSb0X1zPGxPIRFy5GoGtO9Mm5h4OZk= """ class TestSystemLogin(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestSystemLogin, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration which will break this test cls.cli_delete(cls, base_path + ['radius']) cls.cli_delete(cls, base_path + ['tacacs']) def tearDown(self): # Delete individual users from configuration for user in users: self.cli_delete(base_path + ['user', user]) self.cli_delete(base_path + ['radius']) self.cli_delete(base_path + ['tacacs']) self.cli_commit() # After deletion, a user is not allowed to remain in /etc/passwd usernames = [x[0] for x in getpwall()] for user in users: self.assertNotIn(user, usernames) def test_add_linux_system_user(self): # We are not allowed to re-use a username already taken by the Linux # base system system_user = 'backup' self.cli_set(base_path + ['user', system_user, 'authentication', 'plaintext-password', system_user]) # check validate() - can not add username which exists on the Debian # base system (UID < 1000) with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['user', system_user]) def test_system_login_user(self): # Check if user can be created and we can SSH to localhost self.cli_set(['service', 'ssh', 'port', '22']) for user in users: name = "VyOS Roxx " + user home_dir = "/tmp/" + user self.cli_set(base_path + ['user', user, 'authentication', 'plaintext-password', user]) self.cli_set(base_path + ['user', user, 'full-name', 'VyOS Roxx']) self.cli_set(base_path + ['user', user, 'home-directory', home_dir]) self.cli_commit() for user in users: - cmd = ['su','-', user] - proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) + tmp = ['su','-', user] + proc = Popen(tmp, stdin=PIPE, stdout=PIPE, stderr=PIPE) tmp = "{}\nuname -a".format(user) proc.stdin.write(tmp.encode()) proc.stdin.flush() (stdout, stderr) = proc.communicate() # stdout is something like this: # b'Linux LR1.wue3 5.10.61-amd64-vyos #1 SMP Fri Aug 27 08:55:46 UTC 2021 x86_64 GNU/Linux\n' self.assertTrue(len(stdout) > 40) + locked_user = users[0] + # disable the first user in list + self.cli_set(base_path + ['user', locked_user, 'disable']) + self.cli_commit() + # check if account is locked + tmp = cmd(f'sudo passwd -S {locked_user}') + self.assertIn(f'{locked_user} L ', tmp) + + # unlock account + self.cli_delete(base_path + ['user', locked_user, 'disable']) + self.cli_commit() + # check if account is unlocked + tmp = cmd(f'sudo passwd -S {locked_user}') + self.assertIn(f'{locked_user} P ', tmp) + + def test_system_login_otp(self): otp_user = 'otp-test_user' otp_password = 'SuperTestPassword' otp_key = '76A3ZS6HFHBTOK2H4NDHTIVFPQ' self.cli_set(base_path + ['user', otp_user, 'authentication', 'plaintext-password', otp_password]) self.cli_set(base_path + ['user', otp_user, 'authentication', 'otp', 'key', otp_key]) self.cli_commit() # Check if OTP key was written properly tmp = cmd(f'sudo head -1 /home/{otp_user}/.google_authenticator') self.assertIn(otp_key, tmp) self.cli_delete(base_path + ['user', otp_user]) def test_system_user_ssh_key(self): ssh_user = 'ssh-test_user' public_keys = 'vyos_test@domain-foo.com' type = 'ssh-rsa' self.cli_set(base_path + ['user', ssh_user, 'authentication', 'public-keys', public_keys, 'key', ssh_pubkey.replace('\n','')]) # check validate() - missing type for public-key with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['user', ssh_user, 'authentication', 'public-keys', public_keys, 'type', type]) self.cli_commit() # Check that SSH key was written properly tmp = cmd(f'sudo cat /home/{ssh_user}/.ssh/authorized_keys') key = f'{type} ' + ssh_pubkey.replace('\n','') self.assertIn(key, tmp) self.cli_delete(base_path + ['user', ssh_user]) def test_radius_kernel_features(self): # T2886: RADIUS requires some Kernel options to be present - kernel = platform.release() - kernel_config = read_file(f'/boot/config-{kernel}') + kernel_config = GzipFile('/proc/config.gz').read().decode('UTF-8') # T2886 - RADIUS authentication - check for statically compiled options options = ['CONFIG_AUDIT', 'CONFIG_AUDITSYSCALL', 'CONFIG_AUDIT_ARCH'] for option in options: self.assertIn(f'{option}=y', kernel_config) def test_system_login_radius_ipv4(self): # Verify generated RADIUS configuration files radius_key = 'VyOSsecretVyOS' radius_server = '172.16.100.10' radius_source = '127.0.0.1' radius_port = '2000' radius_timeout = '1' self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key]) self.cli_set(base_path + ['radius', 'server', radius_server, 'port', radius_port]) self.cli_set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout]) self.cli_set(base_path + ['radius', 'source-address', radius_source]) self.cli_set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) # check validate() - Only one IPv4 source-address supported with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) self.cli_commit() # this file must be read with higher permissions pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf') tmp = re.findall(r'\n?{}:{}\s+{}\s+{}\s+{}'.format(radius_server, radius_port, radius_key, radius_timeout, radius_source), pam_radius_auth_conf) self.assertTrue(tmp) # required, static options self.assertIn('priv-lvl 15', pam_radius_auth_conf) self.assertIn('mapped_priv_user radius_priv_user', pam_radius_auth_conf) # PAM pam_common_account = read_file('/etc/pam.d/common-account') self.assertIn('pam_radius_auth.so', pam_common_account) pam_common_auth = read_file('/etc/pam.d/common-auth') self.assertIn('pam_radius_auth.so', pam_common_auth) pam_common_session = read_file('/etc/pam.d/common-session') self.assertIn('pam_radius_auth.so', pam_common_session) pam_common_session_noninteractive = read_file('/etc/pam.d/common-session-noninteractive') self.assertIn('pam_radius_auth.so', pam_common_session_noninteractive) # NSS nsswitch_conf = read_file('/etc/nsswitch.conf') tmp = re.findall(r'passwd:\s+mapuid\s+files\s+mapname', nsswitch_conf) self.assertTrue(tmp) tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf) self.assertTrue(tmp) def test_system_login_radius_ipv6(self): # Verify generated RADIUS configuration files radius_key = 'VyOS-VyOS' radius_server = '2001:db8::1' radius_source = '::1' radius_port = '4000' radius_timeout = '4' self.cli_set(base_path + ['radius', 'server', radius_server, 'key', radius_key]) self.cli_set(base_path + ['radius', 'server', radius_server, 'port', radius_port]) self.cli_set(base_path + ['radius', 'server', radius_server, 'timeout', radius_timeout]) self.cli_set(base_path + ['radius', 'source-address', radius_source]) self.cli_set(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) # check validate() - Only one IPv4 source-address supported with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_delete(base_path + ['radius', 'source-address', inc_ip(radius_source, 1)]) self.cli_commit() # this file must be read with higher permissions pam_radius_auth_conf = cmd('sudo cat /etc/pam_radius_auth.conf') tmp = re.findall(r'\n?\[{}\]:{}\s+{}\s+{}\s+\[{}\]'.format(radius_server, radius_port, radius_key, radius_timeout, radius_source), pam_radius_auth_conf) self.assertTrue(tmp) # required, static options self.assertIn('priv-lvl 15', pam_radius_auth_conf) self.assertIn('mapped_priv_user radius_priv_user', pam_radius_auth_conf) # PAM pam_common_account = read_file('/etc/pam.d/common-account') self.assertIn('pam_radius_auth.so', pam_common_account) pam_common_auth = read_file('/etc/pam.d/common-auth') self.assertIn('pam_radius_auth.so', pam_common_auth) pam_common_session = read_file('/etc/pam.d/common-session') self.assertIn('pam_radius_auth.so', pam_common_session) pam_common_session_noninteractive = read_file('/etc/pam.d/common-session-noninteractive') self.assertIn('pam_radius_auth.so', pam_common_session_noninteractive) # NSS nsswitch_conf = read_file('/etc/nsswitch.conf') tmp = re.findall(r'passwd:\s+mapuid\s+files\s+mapname', nsswitch_conf) self.assertTrue(tmp) tmp = re.findall(r'group:\s+mapname\s+files', nsswitch_conf) self.assertTrue(tmp) def test_system_login_max_login_session(self): max_logins = '2' timeout = '600' self.cli_set(base_path + ['max-login-session', max_logins]) # 'max-login-session' must be only with 'timeout' option with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['timeout', timeout]) self.cli_commit() security_limits = read_file('/etc/security/limits.d/10-vyos.conf') self.assertIn(f'* - maxsyslogins {max_logins}', security_limits) self.cli_delete(base_path + ['timeout']) self.cli_delete(base_path + ['max-login-session']) def test_system_login_tacacs(self): tacacs_secret = 'tac_plus_key' tacacs_servers = ['100.64.0.11', '100.64.0.12'] # Enable TACACS for server in tacacs_servers: self.cli_set(base_path + ['tacacs', 'server', server, 'key', tacacs_secret]) self.cli_commit() # NSS nsswitch_conf = read_file('/etc/nsswitch.conf') tmp = re.findall(r'passwd:\s+tacplus\s+files', nsswitch_conf) self.assertTrue(tmp) tmp = re.findall(r'group:\s+tacplus\s+files', nsswitch_conf) self.assertTrue(tmp) # PAM TACACS configuration pam_tacacs_conf = read_file('/etc/tacplus_servers') # NSS TACACS configuration nss_tacacs_conf = read_file('/etc/tacplus_nss.conf') # Users have individual home directories self.assertIn('user_homedir=1', pam_tacacs_conf) # specify services self.assertIn('service=shell', pam_tacacs_conf) self.assertIn('protocol=ssh', pam_tacacs_conf) for server in tacacs_servers: self.assertIn(f'secret={tacacs_secret}', pam_tacacs_conf) self.assertIn(f'server={server}', pam_tacacs_conf) self.assertIn(f'secret={tacacs_secret}', nss_tacacs_conf) self.assertIn(f'server={server}', nss_tacacs_conf) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/conf_mode/system_login.py b/src/conf_mode/system_login.py index 30e823bd4..cff0c5039 100755 --- a/src/conf_mode/system_login.py +++ b/src/conf_mode/system_login.py @@ -1,427 +1,433 @@ #!/usr/bin/env python3 # # Copyright (C) 2020-2023 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os from passlib.hosts import linux_context from psutil import users from pwd import getpwall from pwd import getpwnam from pwd import getpwuid from sys import exit from time import sleep from vyos.config import Config from vyos.configverify import verify_vrf from vyos.defaults import directories from vyos.template import render from vyos.template import is_ipv4 from vyos.utils.dict import dict_search from vyos.utils.file import chown from vyos.utils.process import cmd from vyos.utils.process import call from vyos.utils.process import rc_cmd from vyos.utils.process import run from vyos.utils.process import DEVNULL from vyos import ConfigError from vyos import airbag airbag.enable() autologout_file = "/etc/profile.d/autologout.sh" limits_file = "/etc/security/limits.d/10-vyos.conf" radius_config_file = "/etc/pam_radius_auth.conf" tacacs_pam_config_file = "/etc/tacplus_servers" tacacs_nss_config_file = "/etc/tacplus_nss.conf" nss_config_file = "/etc/nsswitch.conf" # Minimum UID used when adding system users MIN_USER_UID: int = 1000 # Maximim UID used when adding system users MAX_USER_UID: int = 59999 # LOGIN_TIMEOUT from /etc/loign.defs minus 10 sec MAX_RADIUS_TIMEOUT: int = 50 # MAX_RADIUS_TIMEOUT divided by 2 sec (minimum recomended timeout) MAX_RADIUS_COUNT: int = 8 # Maximum number of supported TACACS servers MAX_TACACS_COUNT: int = 8 # List of local user accounts that must be preserved SYSTEM_USER_SKIP_LIST: list = ['radius_user', 'radius_priv_user', 'tacacs0', 'tacacs1', 'tacacs2', 'tacacs3', 'tacacs4', 'tacacs5', 'tacacs6', 'tacacs7', 'tacacs8', 'tacacs9', 'tacacs10',' tacacs11', 'tacacs12', 'tacacs13', 'tacacs14', 'tacacs15'] def get_local_users(): """Return list of dynamically allocated users (see Debian Policy Manual)""" local_users = [] for s_user in getpwall(): if getpwnam(s_user.pw_name).pw_uid < MIN_USER_UID: continue if getpwnam(s_user.pw_name).pw_uid > MAX_USER_UID: continue if s_user.pw_name in SYSTEM_USER_SKIP_LIST: continue local_users.append(s_user.pw_name) return local_users def get_shadow_password(username): with open('/etc/shadow') as f: for user in f.readlines(): items = user.split(":") if username == items[0]: return items[1] return None def get_config(config=None): if config: conf = config else: conf = Config() base = ['system', 'login'] login = conf.get_config_dict(base, key_mangling=('-', '_'), no_tag_node_value_mangle=True, get_first_key=True, with_recursive_defaults=True) # users no longer existing in the running configuration need to be deleted local_users = get_local_users() cli_users = [] if 'user' in login: cli_users = list(login['user']) # prune TACACS global defaults if not set by user if login.from_defaults(['tacacs']): del login['tacacs'] # same for RADIUS if login.from_defaults(['radius']): del login['radius'] # create a list of all users, cli and users all_users = list(set(local_users + cli_users)) # We will remove any normal users that dos not exist in the current # configuration. This can happen if user is added but configuration was not # saved and system is rebooted. rm_users = [tmp for tmp in all_users if tmp not in cli_users] if rm_users: login.update({'rm_users' : rm_users}) return login def verify(login): if 'rm_users' in login: # This check is required as the script is also executed from vyos-router # init script and there is no SUDO_USER environment variable available # during system boot. if 'SUDO_USER' in os.environ: cur_user = os.environ['SUDO_USER'] if cur_user in login['rm_users']: raise ConfigError(f'Attempting to delete current user: {cur_user}') if 'user' in login: system_users = getpwall() for user, user_config in login['user'].items(): # Linux system users range up until UID 1000, we can not create a # VyOS CLI user which already exists as system user for s_user in system_users: if s_user.pw_name == user and s_user.pw_uid < MIN_USER_UID: raise ConfigError(f'User "{user}" can not be created, conflict with local system account!') for pubkey, pubkey_options in (dict_search('authentication.public_keys', user_config) or {}).items(): if 'type' not in pubkey_options: raise ConfigError(f'Missing type for public-key "{pubkey}"!') if 'key' not in pubkey_options: raise ConfigError(f'Missing key for public-key "{pubkey}"!') if {'radius', 'tacacs'} <= set(login): raise ConfigError('Using both RADIUS and TACACS at the same time is not supported!') # At lease one RADIUS server must not be disabled if 'radius' in login: if 'server' not in login['radius']: raise ConfigError('No RADIUS server defined!') sum_timeout: int = 0 radius_servers_count: int = 0 fail = True for server, server_config in dict_search('radius.server', login).items(): if 'key' not in server_config: raise ConfigError(f'RADIUS server "{server}" requires key!') if 'disable' not in server_config: sum_timeout += int(server_config['timeout']) radius_servers_count += 1 fail = False if fail: raise ConfigError('All RADIUS servers are disabled') if radius_servers_count > MAX_RADIUS_COUNT: raise ConfigError(f'Number of RADIUS servers exceeded maximum of {MAX_RADIUS_COUNT}!') if sum_timeout > MAX_RADIUS_TIMEOUT: raise ConfigError('Sum of RADIUS servers timeouts ' 'has to be less or eq 50 sec') verify_vrf(login['radius']) if 'source_address' in login['radius']: ipv4_count = 0 ipv6_count = 0 for address in login['radius']['source_address']: if is_ipv4(address): ipv4_count += 1 else: ipv6_count += 1 if ipv4_count > 1: raise ConfigError('Only one IPv4 source-address can be set!') if ipv6_count > 1: raise ConfigError('Only one IPv6 source-address can be set!') if 'tacacs' in login: tacacs_servers_count: int = 0 fail = True for server, server_config in dict_search('tacacs.server', login).items(): if 'key' not in server_config: raise ConfigError(f'TACACS server "{server}" requires key!') if 'disable' not in server_config: tacacs_servers_count += 1 fail = False if fail: raise ConfigError('All RADIUS servers are disabled') if tacacs_servers_count > MAX_TACACS_COUNT: raise ConfigError(f'Number of TACACS servers exceeded maximum of {MAX_TACACS_COUNT}!') verify_vrf(login['tacacs']) if 'max_login_session' in login and 'timeout' not in login: raise ConfigError('"login timeout" must be configured!') return None def generate(login): # calculate users encrypted password if 'user' in login: for user, user_config in login['user'].items(): tmp = dict_search('authentication.plaintext_password', user_config) if tmp: encrypted_password = linux_context.hash(tmp) login['user'][user]['authentication']['encrypted_password'] = encrypted_password del login['user'][user]['authentication']['plaintext_password'] # remove old plaintext password and set new encrypted password env = os.environ.copy() env['vyos_libexec_dir'] = directories['base'] # Set default commands for re-adding user with encrypted password del_user_plain = f"system login user {user} authentication plaintext-password" add_user_encrypt = f"system login user {user} authentication encrypted-password '{encrypted_password}'" lvl = env['VYATTA_EDIT_LEVEL'] # We're in config edit level, for example "edit system login" # Change default commands for re-adding user with encrypted password if lvl != '/': # Replace '/system/login' to 'system login' lvl = lvl.strip('/').split('/') # Convert command str to list del_user_plain = del_user_plain.split() # New command exclude level, for example "edit system login" del_user_plain = del_user_plain[len(lvl):] # Convert string to list del_user_plain = " ".join(del_user_plain) add_user_encrypt = add_user_encrypt.split() add_user_encrypt = add_user_encrypt[len(lvl):] add_user_encrypt = " ".join(add_user_encrypt) ret, out = rc_cmd(f"/opt/vyatta/sbin/my_delete {del_user_plain}", env=env) if ret: raise ConfigError(out) ret, out = rc_cmd(f"/opt/vyatta/sbin/my_set {add_user_encrypt}", env=env) if ret: raise ConfigError(out) else: try: if get_shadow_password(user) == dict_search('authentication.encrypted_password', user_config): # If the current encrypted bassword matches the encrypted password # from the config - do not update it. This will remove the encrypted # value from the system logs. # # The encrypted password will be set only once during the first boot # after an image upgrade. del login['user'][user]['authentication']['encrypted_password'] except: pass ### RADIUS based user authentication if 'radius' in login: render(radius_config_file, 'login/pam_radius_auth.conf.j2', login, permission=0o600, user='root', group='root') else: if os.path.isfile(radius_config_file): os.unlink(radius_config_file) ### TACACS+ based user authentication if 'tacacs' in login: render(tacacs_pam_config_file, 'login/tacplus_servers.j2', login, permission=0o644, user='root', group='root') render(tacacs_nss_config_file, 'login/tacplus_nss.conf.j2', login, permission=0o644, user='root', group='root') else: if os.path.isfile(tacacs_pam_config_file): os.unlink(tacacs_pam_config_file) if os.path.isfile(tacacs_nss_config_file): os.unlink(tacacs_nss_config_file) # NSS must always be present on the system render(nss_config_file, 'login/nsswitch.conf.j2', login, permission=0o644, user='root', group='root') # /etc/security/limits.d/10-vyos.conf if 'max_login_session' in login: render(limits_file, 'login/limits.j2', login, permission=0o644, user='root', group='root') else: if os.path.isfile(limits_file): os.unlink(limits_file) if 'timeout' in login: render(autologout_file, 'login/autologout.j2', login, permission=0o755, user='root', group='root') else: if os.path.isfile(autologout_file): os.unlink(autologout_file) return None def apply(login): enable_otp = False if 'user' in login: for user, user_config in login['user'].items(): # make new user using vyatta shell and make home directory (-m), # default group of 100 (users) command = 'useradd --create-home --no-user-group ' # check if user already exists: if user in get_local_users(): # update existing account command = 'usermod' # all accounts use /bin/vbash command += ' --shell /bin/vbash' # we need to use '' quotes when passing formatted data to the shell # else it will not work as some data parts are lost in translation tmp = dict_search('authentication.encrypted_password', user_config) if tmp: command += f" --password '{tmp}'" tmp = dict_search('full_name', user_config) if tmp: command += f" --comment '{tmp}'" tmp = dict_search('home_directory', user_config) if tmp: command += f" --home '{tmp}'" else: command += f" --home '/home/{user}'" command += f' --groups frr,frrvty,vyattacfg,sudo,adm,dip,disk {user}' try: cmd(command) # we should not rely on the value stored in # user_config['home_directory'], as a crazy user will choose # username root or any other system user which will fail. # # XXX: Should we deny using root at all? home_dir = getpwnam(user).pw_dir # T5875: ensure UID is properly set on home directory if user is re-added # the home directory will always exist, as it's created above by --create-home, # retrieve current owner of home directory and adjust it on demand dir_owner = getpwuid(os.stat(home_dir).st_uid).pw_name if dir_owner != user: chown(home_dir, user=user, recursive=True) render(f'{home_dir}/.ssh/authorized_keys', 'login/authorized_keys.j2', user_config, permission=0o600, formater=lambda _: _.replace(""", '"'), user=user, group='users') except Exception as e: raise ConfigError(f'Adding user "{user}" raised exception: "{e}"') # Generate 2FA/MFA One-Time-Pad configuration if dict_search('authentication.otp.key', user_config): enable_otp = True render(f'{home_dir}/.google_authenticator', 'login/pam_otp_ga.conf.j2', user_config, permission=0o400, user=user, group='users') else: # delete configuration as it's not enabled for the user if os.path.exists(f'{home_dir}/.google_authenticator'): os.remove(f'{home_dir}/.google_authenticator') + # Lock/Unlock local user account + lock_unlock = '--unlock' + if 'disable' in user_config: + lock_unlock = '--lock' + cmd(f'usermod {lock_unlock} {user}') + if 'rm_users' in login: for user in login['rm_users']: try: # Disable user to prevent re-login call(f'usermod -s /sbin/nologin {user}') # Logout user if he is still logged in if user in list(set([tmp[0] for tmp in users()])): print(f'{user} is logged in, forcing logout!') # re-run command until user is logged out while run(f'pkill -HUP -u {user}'): sleep(0.250) # Remove user account but leave home directory in place. Re-run # command until user is removed - userdel might return 8 as # SSH sessions are not all yet properly cleaned away, thus we # simply re-run the command until the account wen't away while run(f'userdel {user}', stderr=DEVNULL): sleep(0.250) except Exception as e: raise ConfigError(f'Deleting user "{user}" raised exception: {e}') # Enable/disable RADIUS in PAM configuration cmd('pam-auth-update --disable radius-mandatory radius-optional') if 'radius' in login: if login['radius'].get('security_mode', '') == 'mandatory': pam_profile = 'radius-mandatory' else: pam_profile = 'radius-optional' cmd(f'pam-auth-update --enable {pam_profile}') # Enable/disable TACACS+ in PAM configuration cmd('pam-auth-update --disable tacplus-mandatory tacplus-optional') if 'tacacs' in login: if login['tacacs'].get('security_mode', '') == 'mandatory': pam_profile = 'tacplus-mandatory' else: pam_profile = 'tacplus-optional' cmd(f'pam-auth-update --enable {pam_profile}') # Enable/disable Google authenticator cmd('pam-auth-update --disable mfa-google-authenticator') if enable_otp: cmd(f'pam-auth-update --enable mfa-google-authenticator') return None if __name__ == '__main__': try: c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1)