diff --git a/smoketest/scripts/cli/test_service_snmp.py b/smoketest/scripts/cli/test_service_snmp.py index b3daa90d0..7d5eaa440 100755 --- a/smoketest/scripts/cli/test_service_snmp.py +++ b/smoketest/scripts/cli/test_service_snmp.py @@ -1,250 +1,264 @@ #!/usr/bin/env python3 # # Copyright (C) 2019-2021 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import re import unittest from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.template import is_ipv4 from vyos.template import address_from_cidr from vyos.utils.process import call from vyos.utils.process import DEVNULL from vyos.utils.file import read_file from vyos.utils.process import process_named_running from vyos.version import get_version_data PROCESS_NAME = 'snmpd' SNMPD_CONF = '/etc/snmp/snmpd.conf' base_path = ['service', 'snmp'] snmpv3_group = 'default_group' snmpv3_view = 'default_view' snmpv3_view_oid = '1' snmpv3_user = 'vyos' snmpv3_auth_pw = 'vyos12345678' snmpv3_priv_pw = 'vyos87654321' snmpv3_engine_id = '000000000000000000000002' def get_config_value(key): tmp = read_file(SNMPD_CONF) tmp = re.findall(r'\n?{}\s+(.*)'.format(key), tmp) return tmp[0] class TestSNMPService(VyOSUnitTestSHIM.TestCase): @classmethod def setUpClass(cls): super(TestSNMPService, cls).setUpClass() # ensure we can also run this test on a live system - so lets clean # out the current configuration :) cls.cli_delete(cls, base_path) def tearDown(self): # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) # delete testing SNMP config self.cli_delete(base_path) self.cli_commit() # Check for running process self.assertFalse(process_named_running(PROCESS_NAME)) def test_snmp_basic(self): dummy_if = 'dum7312' dummy_addr = '100.64.0.1/32' contact = 'maintainers@vyos.io' location = 'QEMU' self.cli_set(['interfaces', 'dummy', dummy_if, 'address', dummy_addr]) # Check if SNMP can be configured and service runs clients = ['192.0.2.1', '2001:db8::1'] networks = ['192.0.2.128/25', '2001:db8:babe::/48'] listen = ['127.0.0.1', '::1', address_from_cidr(dummy_addr)] port = '5000' for auth in ['ro', 'rw']: community = 'VyOS' + auth self.cli_set(base_path + ['community', community, 'authorization', auth]) for client in clients: self.cli_set(base_path + ['community', community, 'client', client]) for network in networks: self.cli_set(base_path + ['community', community, 'network', network]) for addr in listen: self.cli_set(base_path + ['listen-address', addr, 'port', port]) self.cli_set(base_path + ['contact', contact]) self.cli_set(base_path + ['location', location]) self.cli_commit() # verify listen address, it will be returned as # ['unix:/run/snmpd.socket,udp:127.0.0.1:161,udp6:[::1]:161'] # thus we need to transfor this into a proper list config = get_config_value('agentaddress') expected = 'unix:/run/snmpd.socket' self.assertIn(expected, config) for addr in listen: if is_ipv4(addr): expected = f'udp:{addr}:{port}' else: expected = f'udp6:[{addr}]:{port}' self.assertIn(expected, config) config = get_config_value('sysDescr') version_data = get_version_data() self.assertEqual('VyOS ' + version_data['version'], config) config = get_config_value('SysContact') self.assertEqual(contact, config) config = get_config_value('SysLocation') self.assertEqual(location, config) # Check for running process self.assertTrue(process_named_running(PROCESS_NAME)) self.cli_delete(['interfaces', 'dummy', dummy_if]) ## Check communities and default view RESTRICTED for auth in ['ro', 'rw']: community = 'VyOS' + auth for addr in clients: if is_ipv4(addr): entry = auth + 'community ' + community + ' ' + addr + ' -V' else: entry = auth + 'community6 ' + community + ' ' + addr + ' -V' config = get_config_value(entry) expected = 'RESTRICTED' self.assertIn(expected, config) for addr in networks: if is_ipv4(addr): entry = auth + 'community ' + community + ' ' + addr + ' -V' else: entry = auth + 'community6 ' + community + ' ' + addr + ' -V' config = get_config_value(entry) expected = 'RESTRICTED' self.assertIn(expected, config) # And finally check global entry for RESTRICTED view config = get_config_value('view RESTRICTED included .1') self.assertIn('80', config) def test_snmpv3_sha(self): # Check if SNMPv3 can be configured with SHA authentication # and service runs self.cli_set(base_path + ['v3', 'engineid', snmpv3_engine_id]) self.cli_set(base_path + ['v3', 'group', 'default', 'mode', 'ro']) # check validate() - a view must be created before this can be committed with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['v3', 'view', 'default', 'oid', '1']) self.cli_set(base_path + ['v3', 'group', 'default', 'view', 'default']) # create user self.cli_set(base_path + ['v3', 'user', snmpv3_user, 'auth', 'plaintext-password', snmpv3_auth_pw]) self.cli_set(base_path + ['v3', 'user', snmpv3_user, 'auth', 'type', 'sha']) self.cli_set(base_path + ['v3', 'user', snmpv3_user, 'privacy', 'plaintext-password', snmpv3_priv_pw]) self.cli_set(base_path + ['v3', 'user', snmpv3_user, 'privacy', 'type', 'aes']) self.cli_set(base_path + ['v3', 'user', snmpv3_user, 'group', 'default']) self.cli_commit() # commit will alter the CLI values - check if they have been updated: hashed_password = '4e52fe55fd011c9c51ae2c65f4b78ca93dcafdfe' tmp = self._session.show_config(base_path + ['v3', 'user', snmpv3_user, 'auth', 'encrypted-password']).split()[1] self.assertEqual(tmp, hashed_password) hashed_password = '54705c8de9e81fdf61ad7ac044fa8fe611ddff6b' tmp = self._session.show_config(base_path + ['v3', 'user', snmpv3_user, 'privacy', 'encrypted-password']).split()[1] self.assertEqual(tmp, hashed_password) # TODO: read in config file and check values # Try SNMPv3 connection tmp = call(f'snmpwalk -v 3 -u {snmpv3_user} -a SHA -A {snmpv3_auth_pw} -x AES -X {snmpv3_priv_pw} -l authPriv 127.0.0.1', stdout=DEVNULL) self.assertEqual(tmp, 0) def test_snmpv3_md5(self): # Check if SNMPv3 can be configured with MD5 authentication # and service runs self.cli_set(base_path + ['v3', 'engineid', snmpv3_engine_id]) # create user self.cli_set(base_path + ['v3', 'user', snmpv3_user, 'auth', 'plaintext-password', snmpv3_auth_pw]) self.cli_set(base_path + ['v3', 'user', snmpv3_user, 'auth', 'type', 'md5']) self.cli_set(base_path + ['v3', 'user', snmpv3_user, 'privacy', 'plaintext-password', snmpv3_priv_pw]) self.cli_set(base_path + ['v3', 'user', snmpv3_user, 'privacy', 'type', 'des']) # check validate() - user requires a group to be created with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['v3', 'user', 'vyos', 'group', snmpv3_group]) self.cli_set(base_path + ['v3', 'group', snmpv3_group, 'mode', 'ro']) # check validate() - a view must be created before this can be comitted with self.assertRaises(ConfigSessionError): self.cli_commit() self.cli_set(base_path + ['v3', 'view', snmpv3_view, 'oid', snmpv3_view_oid]) self.cli_set(base_path + ['v3', 'group', snmpv3_group, 'view', snmpv3_view]) self.cli_commit() # commit will alter the CLI values - check if they have been updated: hashed_password = '4c67690d45d3dfcd33d0d7e308e370ad' tmp = self._session.show_config(base_path + ['v3', 'user', 'vyos', 'auth', 'encrypted-password']).split()[1] self.assertEqual(tmp, hashed_password) hashed_password = 'e11c83f2c510540a3c4de84ee66de440' tmp = self._session.show_config(base_path + ['v3', 'user', 'vyos', 'privacy', 'encrypted-password']).split()[1] self.assertEqual(tmp, hashed_password) tmp = read_file(SNMPD_CONF) # views self.assertIn(f'view {snmpv3_view} included .{snmpv3_view_oid}', tmp) # group self.assertIn(f'group {snmpv3_group} usm {snmpv3_user}', tmp) # access self.assertIn(f'access {snmpv3_group} "" usm auth exact {snmpv3_view} none none', tmp) # Try SNMPv3 connection tmp = call(f'snmpwalk -v 3 -u {snmpv3_user} -a MD5 -A {snmpv3_auth_pw} -x DES -X {snmpv3_priv_pw} -l authPriv 127.0.0.1', stdout=DEVNULL) self.assertEqual(tmp, 0) def test_snmpv3_view_exclude(self): snmpv3_view_oid_exclude = ['1.3.6.1.2.1.4.21', '1.3.6.1.2.1.4.24'] self.cli_set(base_path + ['v3', 'group', snmpv3_group, 'view', snmpv3_view]) self.cli_set(base_path + ['v3', 'view', snmpv3_view, 'oid', snmpv3_view_oid]) for excluded in snmpv3_view_oid_exclude: self.cli_set(base_path + ['v3', 'view', snmpv3_view, 'oid', snmpv3_view_oid, 'exclude', excluded]) self.cli_commit() tmp = read_file(SNMPD_CONF) # views self.assertIn(f'view {snmpv3_view} included .{snmpv3_view_oid}', tmp) for excluded in snmpv3_view_oid_exclude: self.assertIn(f'view {snmpv3_view} excluded .{excluded}', tmp) + def test_snmp_script_extensions(self): + extensions = { + 'default': 'snmp_smoketest_extension_script.sh', + 'external': '/run/external_snmp_smoketest_extension_script.sh' + } + + for key, val in extensions.items(): + self.cli_set(base_path + ['script-extensions', 'extension-name', key, 'script', val]) + self.cli_commit() + + self.assertEqual(get_config_value('extend default'), f'/config/user-data/{extensions["default"]}') + self.assertEqual(get_config_value('extend external'), extensions["external"]) + + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/conf_mode/service_snmp.py b/src/conf_mode/service_snmp.py index 6f025cc23..c9c0ed9a0 100755 --- a/src/conf_mode/service_snmp.py +++ b/src/conf_mode/service_snmp.py @@ -1,269 +1,282 @@ #!/usr/bin/env python3 # # Copyright (C) 2018-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os from sys import exit from vyos.base import Warning from vyos.config import Config from vyos.configdict import dict_merge from vyos.configverify import verify_vrf from vyos.snmpv3_hashgen import plaintext_to_md5 from vyos.snmpv3_hashgen import plaintext_to_sha1 from vyos.snmpv3_hashgen import random from vyos.template import render from vyos.utils.configfs import delete_cli_node from vyos.utils.configfs import add_cli_node from vyos.utils.dict import dict_search from vyos.utils.network import is_addr_assigned from vyos.utils.process import call from vyos.utils.permission import chmod_755 from vyos.version import get_version_data from vyos import ConfigError from vyos import airbag airbag.enable() config_file_client = r'/etc/snmp/snmp.conf' config_file_daemon = r'/etc/snmp/snmpd.conf' config_file_access = r'/usr/share/snmp/snmpd.conf' config_file_user = r'/var/lib/snmp/snmpd.conf' +default_script_dir = r'/config/user-data/' systemd_override = r'/run/systemd/system/snmpd.service.d/override.conf' systemd_service = 'snmpd.service' def get_config(config=None): if config: conf = config else: conf = Config() base = ['service', 'snmp'] snmp = conf.get_config_dict(base, key_mangling=('-', '_'), get_first_key=True, no_tag_node_value_mangle=True) if not conf.exists(base): snmp.update({'deleted' : ''}) if conf.exists(['service', 'lldp', 'snmp']): snmp.update({'lldp_snmp' : ''}) if 'deleted' in snmp: return snmp version_data = get_version_data() snmp['version'] = version_data['version'] # create an internal snmpv3 user of the form 'vyosxxxxxxxxxxxxxxxx' snmp['vyos_user'] = 'vyos' + random(8) snmp['vyos_user_pass'] = random(16) # We have gathered the dict representation of the CLI, but there are default # options which we need to update into the dictionary retrived. snmp = conf.merge_defaults(snmp, recursive=True) if 'listen_address' in snmp: # Always listen on localhost if an explicit address has been configured # This is a safety measure to not end up with invalid listen addresses # that are not configured on this system. See https://vyos.dev/T850 if '127.0.0.1' not in snmp['listen_address']: tmp = {'127.0.0.1': {'port': '161'}} snmp['listen_address'] = dict_merge(tmp, snmp['listen_address']) if '::1' not in snmp['listen_address']: tmp = {'::1': {'port': '161'}} snmp['listen_address'] = dict_merge(tmp, snmp['listen_address']) + if 'script_extensions' in snmp and 'extension_name' in snmp['script_extensions']: + for key, val in snmp['script_extensions']['extension_name'].items(): + if 'script' not in val: + continue + script_path = val['script'] + # if script has not absolute path, use pre configured path + if not os.path.isabs(script_path): + script_path = os.path.join(default_script_dir, script_path) + + snmp['script_extensions']['extension_name'][key]['script'] = script_path + return snmp + def verify(snmp): if 'deleted' in snmp: return None if {'deleted', 'lldp_snmp'} <= set(snmp): raise ConfigError('Can not delete SNMP service, as LLDP still uses SNMP!') ### check if the configured script actually exist if 'script_extensions' in snmp and 'extension_name' in snmp['script_extensions']: for extension, extension_opt in snmp['script_extensions']['extension_name'].items(): if 'script' not in extension_opt: raise ConfigError(f'Script extension "{extension}" requires an actual script to be configured!') tmp = extension_opt['script'] if not os.path.isfile(tmp): Warning(f'script "{tmp}" does not exist!') else: chmod_755(extension_opt['script']) if 'listen_address' in snmp: for address in snmp['listen_address']: # We only wan't to configure addresses that exist on the system. # Hint the user if they don't exist if 'vrf' in snmp: vrf_name = snmp['vrf'] if not is_addr_assigned(address, vrf_name) and address not in ['::1','127.0.0.1']: raise ConfigError(f'SNMP listen address "{address}" not configured in vrf "{vrf_name}"!') elif not is_addr_assigned(address): raise ConfigError(f'SNMP listen address "{address}" not configured in default vrf!') if 'trap_target' in snmp: for trap, trap_config in snmp['trap_target'].items(): if 'community' not in trap_config: raise ConfigError(f'Trap target "{trap}" requires a community to be set!') if 'oid_enable' in snmp: Warning(f'Custom OIDs are enabled and may lead to system instability and high resource consumption') verify_vrf(snmp) # bail out early if SNMP v3 is not configured if 'v3' not in snmp: return None if 'user' in snmp['v3']: for user, user_config in snmp['v3']['user'].items(): if 'group' not in user_config: raise ConfigError(f'Group membership required for user "{user}"!') if 'plaintext_password' not in user_config['auth'] and 'encrypted_password' not in user_config['auth']: raise ConfigError(f'Must specify authentication encrypted-password or plaintext-password for user "{user}"!') if 'plaintext_password' not in user_config['privacy'] and 'encrypted_password' not in user_config['privacy']: raise ConfigError(f'Must specify privacy encrypted-password or plaintext-password for user "{user}"!') if 'group' in snmp['v3']: for group, group_config in snmp['v3']['group'].items(): if 'seclevel' not in group_config: raise ConfigError(f'Must configure "seclevel" for group "{group}"!') if 'view' not in group_config: raise ConfigError(f'Must configure "view" for group "{group}"!') # Check if 'view' exists view = group_config['view'] if 'view' not in snmp['v3'] or view not in snmp['v3']['view']: raise ConfigError(f'You must create view "{view}" first!') if 'view' in snmp['v3']: for view, view_config in snmp['v3']['view'].items(): if 'oid' not in view_config: raise ConfigError(f'Must configure an "oid" for view "{view}"!') if 'trap_target' in snmp['v3']: for trap, trap_config in snmp['v3']['trap_target'].items(): if 'plaintext_password' not in trap_config['auth'] and 'encrypted_password' not in trap_config['auth']: raise ConfigError(f'Must specify one of authentication encrypted-password or plaintext-password for trap "{trap}"!') if {'plaintext_password', 'encrypted_password'} <= set(trap_config['auth']): raise ConfigError(f'Can not specify both authentication encrypted-password and plaintext-password for trap "{trap}"!') if 'plaintext_password' not in trap_config['privacy'] and 'encrypted_password' not in trap_config['privacy']: raise ConfigError(f'Must specify one of privacy encrypted-password or plaintext-password for trap "{trap}"!') if {'plaintext_password', 'encrypted_password'} <= set(trap_config['privacy']): raise ConfigError(f'Can not specify both privacy encrypted-password and plaintext-password for trap "{trap}"!') if 'type' not in trap_config: raise ConfigError('SNMP v3 trap "type" must be specified!') return None def generate(snmp): # As we are manipulating the snmpd user database we have to stop it first! # This is even save if service is going to be removed call(f'systemctl stop {systemd_service}') # Clean config files config_files = [config_file_client, config_file_daemon, config_file_access, config_file_user, systemd_override] for file in config_files: if os.path.isfile(file): os.unlink(file) if 'deleted' in snmp: return None if 'v3' in snmp: # SNMPv3 uses a hashed password. If CLI defines a plaintext password, # we will hash it in the background and replace the CLI node! if 'user' in snmp['v3']: for user, user_config in snmp['v3']['user'].items(): if dict_search('auth.type', user_config) == 'sha': hash = plaintext_to_sha1 else: hash = plaintext_to_md5 if dict_search('auth.plaintext_password', user_config) is not None: tmp = hash(dict_search('auth.plaintext_password', user_config), dict_search('v3.engineid', snmp)) snmp['v3']['user'][user]['auth']['encrypted_password'] = tmp del snmp['v3']['user'][user]['auth']['plaintext_password'] cli_base = ['service', 'snmp', 'v3', 'user', user, 'auth'] delete_cli_node(cli_base + ['plaintext-password']) add_cli_node(cli_base + ['encrypted-password'], value=tmp) if dict_search('privacy.plaintext_password', user_config) is not None: tmp = hash(dict_search('privacy.plaintext_password', user_config), dict_search('v3.engineid', snmp)) snmp['v3']['user'][user]['privacy']['encrypted_password'] = tmp del snmp['v3']['user'][user]['privacy']['plaintext_password'] cli_base = ['service', 'snmp', 'v3', 'user', user, 'privacy'] delete_cli_node(cli_base + ['plaintext-password']) add_cli_node(cli_base + ['encrypted-password'], value=tmp) # Write client config file render(config_file_client, 'snmp/etc.snmp.conf.j2', snmp) # Write server config file render(config_file_daemon, 'snmp/etc.snmpd.conf.j2', snmp) # Write access rights config file render(config_file_access, 'snmp/usr.snmpd.conf.j2', snmp) # Write access rights config file render(config_file_user, 'snmp/var.snmpd.conf.j2', snmp) # Write daemon configuration file render(systemd_override, 'snmp/override.conf.j2', snmp) return None def apply(snmp): # Always reload systemd manager configuration call('systemctl daemon-reload') if 'deleted' in snmp: return None # start SNMP daemon call(f'systemctl reload-or-restart {systemd_service}') # Enable AgentX in FRR # This should be done for each daemon individually because common command # works only if all the daemons started with SNMP support # Following daemons from FRR 9.0/stable have SNMP module compiled in VyOS frr_daemons_list = ['zebra', 'bgpd', 'ospf6d', 'ospfd', 'ripd', 'isisd', 'ldpd'] for frr_daemon in frr_daemons_list: call(f'vtysh -c "configure terminal" -d {frr_daemon} -c "agentx" >/dev/null') return None if __name__ == '__main__': try: c = get_config() verify(c) generate(c) apply(c) except ConfigError as e: print(e) exit(1)