Page MenuHomeVyOS Platform

test_service_dns_forwarding.py
No OneTemporary

Size
7 KB
Referenced Files
None
Subscribers
None

test_service_dns_forwarding.py

#!/usr/bin/env python3
#
# Copyright (C) 2019-2021 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
import unittest
from base_vyostest_shim import VyOSUnitTestSHIM
from vyos.configsession import ConfigSessionError
from vyos.util import read_file
from vyos.util import process_named_running
CONFIG_FILE = '/run/powerdns/recursor.conf'
FORWARD_FILE = '/run/powerdns/recursor.forward-zones.conf'
HOSTSD_FILE = '/run/powerdns/recursor.vyos-hostsd.conf.lua'
PROCESS_NAME= 'pdns-r/worker'
base_path = ['service', 'dns', 'forwarding']
allow_from = ['192.0.2.0/24', '2001:db8::/32']
listen_adress = ['127.0.0.1', '::1']
def get_config_value(key, file=CONFIG_FILE):
tmp = read_file(file)
tmp = re.findall(r'\n{}=+(.*)'.format(key), tmp)
return tmp[0]
class TestServicePowerDNS(VyOSUnitTestSHIM.TestCase):
def tearDown(self):
# Delete DNS forwarding configuration
self.cli_delete(base_path)
self.cli_commit()
# Check for running process
self.assertFalse(process_named_running(PROCESS_NAME))
def test_basic_forwarding(self):
# Check basic DNS forwarding settings
cache_size = '20'
negative_ttl = '120'
self.cli_set(base_path + ['cache-size', cache_size])
self.cli_set(base_path + ['negative-ttl', negative_ttl])
# check validate() - allow from must be defined
with self.assertRaises(ConfigSessionError):
self.cli_commit()
for network in allow_from:
self.cli_set(base_path + ['allow-from', network])
# check validate() - listen-address must be defined
with self.assertRaises(ConfigSessionError):
self.cli_commit()
for address in listen_adress:
self.cli_set(base_path + ['listen-address', address])
# configure DNSSEC
self.cli_set(base_path + ['dnssec', 'validate'])
# Do not use local /etc/hosts file in name resolution
self.cli_set(base_path + ['ignore-hosts-file'])
# commit changes
self.cli_commit()
# Check configured cache-size
tmp = get_config_value('max-cache-entries')
self.assertEqual(tmp, cache_size)
# Networks allowed to query this server
tmp = get_config_value('allow-from')
self.assertEqual(tmp, ','.join(allow_from))
# Addresses to listen for DNS queries
tmp = get_config_value('local-address')
self.assertEqual(tmp, ','.join(listen_adress))
# Maximum amount of time negative entries are cached
tmp = get_config_value('max-negative-ttl')
self.assertEqual(tmp, negative_ttl)
# Do not use local /etc/hosts file in name resolution
tmp = get_config_value('export-etc-hosts')
self.assertEqual(tmp, 'no')
# RFC1918 addresses are looked up by default
tmp = get_config_value('serve-rfc1918')
self.assertEqual(tmp, 'yes')
# Check for running process
self.assertTrue(process_named_running(PROCESS_NAME))
def test_dnssec(self):
# DNSSEC option testing
for network in allow_from:
self.cli_set(base_path + ['allow-from', network])
for address in listen_adress:
self.cli_set(base_path + ['listen-address', address])
options = ['off', 'process-no-validate', 'process', 'log-fail', 'validate']
for option in options:
self.cli_set(base_path + ['dnssec', option])
# commit changes
self.cli_commit()
tmp = get_config_value('dnssec')
self.assertEqual(tmp, option)
# Check for running process
self.assertTrue(process_named_running(PROCESS_NAME))
def test_external_nameserver(self):
# Externe Domain Name Servers (DNS) addresses
for network in allow_from:
self.cli_set(base_path + ['allow-from', network])
for address in listen_adress:
self.cli_set(base_path + ['listen-address', address])
nameservers = ['192.0.2.1', '192.0.2.2']
for nameserver in nameservers:
self.cli_set(base_path + ['name-server', nameserver])
# commit changes
self.cli_commit()
tmp = get_config_value(r'\+.', file=FORWARD_FILE)
self.assertEqual(tmp, ', '.join(nameservers))
# Do not use local /etc/hosts file in name resolution
# default: yes
tmp = get_config_value('export-etc-hosts')
self.assertEqual(tmp, 'yes')
# Check for running process
self.assertTrue(process_named_running(PROCESS_NAME))
def test_domain_forwarding(self):
for network in allow_from:
self.cli_set(base_path + ['allow-from', network])
for address in listen_adress:
self.cli_set(base_path + ['listen-address', address])
domains = ['vyos.io', 'vyos.net', 'vyos.com']
nameservers = ['192.0.2.1', '192.0.2.2']
for domain in domains:
for nameserver in nameservers:
self.cli_set(base_path + ['domain', domain, 'server', nameserver])
# Test 'recursion-desired' flag for only one domain
if domain == domains[0]:
self.cli_set(base_path + ['domain', domain, 'recursion-desired'])
# Test 'negative trust anchor' flag for the second domain only
if domain == domains[1]:
self.cli_set(base_path + ['domain', domain, 'addnta'])
# commit changes
self.cli_commit()
# Test configured name-servers
hosts_conf = read_file(HOSTSD_FILE)
for domain in domains:
# Test 'recursion-desired' flag for the first domain only
if domain == domains[0]: key =f'\+{domain}'
else: key =f'{domain}'
tmp = get_config_value(key, file=FORWARD_FILE)
self.assertEqual(tmp, ', '.join(nameservers))
# Test 'negative trust anchor' flag for the second domain only
if domain == domains[1]:
self.assertIn(f'addNTA("{domain}", "static")', hosts_conf)
# Check for running process
self.assertTrue(process_named_running(PROCESS_NAME))
def test_no_rfc1918_forwarding(self):
for network in allow_from:
self.cli_set(base_path + ['allow-from', network])
for address in listen_adress:
self.cli_set(base_path + ['listen-address', address])
self.cli_set(base_path + ['no-serve-rfc1918'])
# commit changes
self.cli_commit()
# verify configuration
tmp = get_config_value('serve-rfc1918')
self.assertEqual(tmp, 'no')
# Check for running process
self.assertTrue(process_named_running(PROCESS_NAME))
if __name__ == '__main__':
unittest.main(verbosity=2)

File Metadata

Mime Type
text/x-script.python
Expires
Tue, Dec 16, 4:00 AM (3 h, 46 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3097040
Default Alt Text
test_service_dns_forwarding.py (7 KB)

Event Timeline