Page Menu
Home
VyOS Platform
Search
Configure Global Search
Log In
Files
F38742147
test_service_ssh.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
10 KB
Referenced Files
None
Subscribers
None
test_service_ssh.py
View Options
#!/usr/bin/env python3
#
# Copyright (C) 2019-2024 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import
os
import
paramiko
import
re
import
unittest
from
pwd
import
getpwall
from
base_vyostest_shim
import
VyOSUnitTestSHIM
from
vyos.configsession
import
ConfigSessionError
from
vyos.utils.process
import
cmd
from
vyos.utils.process
import
is_systemd_service_running
from
vyos.utils.process
import
process_named_running
from
vyos.utils.file
import
read_file
from
vyos.xml_ref
import
default_value
PROCESS_NAME
=
'sshd'
SSHD_CONF
=
'/run/sshd/sshd_config'
base_path
=
[
'service'
,
'ssh'
]
key_rsa
=
'/etc/ssh/ssh_host_rsa_key'
key_dsa
=
'/etc/ssh/ssh_host_dsa_key'
key_ed25519
=
'/etc/ssh/ssh_host_ed25519_key'
def
get_config_value
(
key
):
tmp
=
read_file
(
SSHD_CONF
)
tmp
=
re
.
findall
(
f
'
\n
?{key}\s+(.*)'
,
tmp
)
return
tmp
class
TestServiceSSH
(
VyOSUnitTestSHIM
.
TestCase
):
@classmethod
def
setUpClass
(
cls
):
super
(
TestServiceSSH
,
cls
)
.
setUpClass
()
# ensure we can also run this test on a live system - so lets clean
# out the current configuration :)
cls
.
cli_delete
(
cls
,
base_path
)
cls
.
cli_delete
(
cls
,
[
'vrf'
])
def
tearDown
(
self
):
# Check for running process
self
.
assertTrue
(
process_named_running
(
PROCESS_NAME
))
# delete testing SSH config
self
.
cli_delete
(
base_path
)
self
.
cli_delete
([
'vrf'
])
self
.
cli_commit
()
self
.
assertTrue
(
os
.
path
.
isfile
(
key_rsa
))
self
.
assertTrue
(
os
.
path
.
isfile
(
key_dsa
))
self
.
assertTrue
(
os
.
path
.
isfile
(
key_ed25519
))
# Established SSH connections remains running after service is stopped.
# We can not use process_named_running here - we rather need to check
# that the systemd service is no longer running
self
.
assertFalse
(
is_systemd_service_running
(
PROCESS_NAME
))
def
test_ssh_default
(
self
):
# Check if SSH service runs with default settings - used for checking
# behavior of <defaultValue> in XML definition
self
.
cli_set
(
base_path
)
# commit changes
self
.
cli_commit
()
# Check configured port agains CLI default value
port
=
get_config_value
(
'Port'
)
cli_default
=
default_value
(
base_path
+
[
'port'
])
self
.
assertEqual
(
port
,
cli_default
)
def
test_ssh_single_listen_address
(
self
):
# Check if SSH service can be configured and runs
self
.
cli_set
(
base_path
+
[
'port'
,
'1234'
])
self
.
cli_set
(
base_path
+
[
'disable-host-validation'
])
self
.
cli_set
(
base_path
+
[
'disable-password-authentication'
])
self
.
cli_set
(
base_path
+
[
'loglevel'
,
'verbose'
])
self
.
cli_set
(
base_path
+
[
'client-keepalive-interval'
,
'100'
])
self
.
cli_set
(
base_path
+
[
'listen-address'
,
'127.0.0.1'
])
# commit changes
self
.
cli_commit
()
# Check configured port
port
=
get_config_value
(
'Port'
)[
0
]
self
.
assertTrue
(
"1234"
in
port
)
# Check DNS usage
dns
=
get_config_value
(
'UseDNS'
)[
0
]
self
.
assertTrue
(
"no"
in
dns
)
# Check PasswordAuthentication
pwd
=
get_config_value
(
'PasswordAuthentication'
)[
0
]
self
.
assertTrue
(
"no"
in
pwd
)
# Check loglevel
loglevel
=
get_config_value
(
'LogLevel'
)[
0
]
self
.
assertTrue
(
"VERBOSE"
in
loglevel
)
# Check listen address
address
=
get_config_value
(
'ListenAddress'
)[
0
]
self
.
assertTrue
(
"127.0.0.1"
in
address
)
# Check keepalive
keepalive
=
get_config_value
(
'ClientAliveInterval'
)[
0
]
self
.
assertTrue
(
"100"
in
keepalive
)
def
test_ssh_multiple_listen_addresses
(
self
):
# Check if SSH service can be configured and runs with multiple
# listen ports and listen-addresses
ports
=
[
'22'
,
'2222'
,
'2223'
,
'2224'
]
for
port
in
ports
:
self
.
cli_set
(
base_path
+
[
'port'
,
port
])
addresses
=
[
'127.0.0.1'
,
'::1'
]
for
address
in
addresses
:
self
.
cli_set
(
base_path
+
[
'listen-address'
,
address
])
# commit changes
self
.
cli_commit
()
# Check configured port
tmp
=
get_config_value
(
'Port'
)
for
port
in
ports
:
self
.
assertIn
(
port
,
tmp
)
# Check listen address
tmp
=
get_config_value
(
'ListenAddress'
)
for
address
in
addresses
:
self
.
assertIn
(
address
,
tmp
)
def
test_ssh_vrf_single
(
self
):
vrf
=
'mgmt'
# Check if SSH service can be bound to given VRF
self
.
cli_set
(
base_path
+
[
'vrf'
,
vrf
])
# VRF does yet not exist - an error must be thrown
with
self
.
assertRaises
(
ConfigSessionError
):
self
.
cli_commit
()
self
.
cli_set
([
'vrf'
,
'name'
,
vrf
,
'table'
,
'1338'
])
# commit changes
self
.
cli_commit
()
# Check for process in VRF
tmp
=
cmd
(
f
'ip vrf pids {vrf}'
)
self
.
assertIn
(
PROCESS_NAME
,
tmp
)
def
test_ssh_vrf_multi
(
self
):
# Check if SSH service can be bound to multiple VRFs
vrfs
=
[
'red'
,
'blue'
,
'green'
]
for
vrf
in
vrfs
:
self
.
cli_set
(
base_path
+
[
'vrf'
,
vrf
])
# VRF does yet not exist - an error must be thrown
with
self
.
assertRaises
(
ConfigSessionError
):
self
.
cli_commit
()
table
=
12345
for
vrf
in
vrfs
:
self
.
cli_set
([
'vrf'
,
'name'
,
vrf
,
'table'
,
str
(
table
)])
table
+=
1
# commit changes
self
.
cli_commit
()
# Check for process in VRF
for
vrf
in
vrfs
:
tmp
=
cmd
(
f
'ip vrf pids {vrf}'
)
self
.
assertIn
(
PROCESS_NAME
,
tmp
)
def
test_ssh_login
(
self
):
# Perform SSH login and command execution with a predefined user. The
# result (output of uname -a) must match the output if the command is
# run natively.
#
# We also try to login as an invalid user - this is not allowed to work.
test_user
=
'ssh_test'
test_pass
=
'v2i57DZs8idUwMN3VC92'
test_command
=
'uname -a'
self
.
cli_set
(
base_path
)
self
.
cli_set
([
'system'
,
'login'
,
'user'
,
test_user
,
'authentication'
,
'plaintext-password'
,
test_pass
])
# commit changes
self
.
cli_commit
()
# Login with proper credentials
output
,
error
=
self
.
ssh_send_cmd
(
test_command
,
test_user
,
test_pass
)
# verify login
self
.
assertFalse
(
error
)
self
.
assertEqual
(
output
,
cmd
(
test_command
))
# Login with invalid credentials
with
self
.
assertRaises
(
paramiko
.
ssh_exception
.
AuthenticationException
):
output
,
error
=
self
.
ssh_send_cmd
(
test_command
,
'invalid_user'
,
'invalid_password'
)
self
.
cli_delete
([
'system'
,
'login'
,
'user'
,
test_user
])
self
.
cli_commit
()
# After deletion the test user is not allowed to remain in /etc/passwd
usernames
=
[
x
[
0
]
for
x
in
getpwall
()]
self
.
assertNotIn
(
test_user
,
usernames
)
def
test_ssh_dynamic_protection
(
self
):
# check sshguard service
SSHGUARD_CONFIG
=
'/etc/sshguard/sshguard.conf'
SSHGUARD_WHITELIST
=
'/etc/sshguard/whitelist'
SSHGUARD_PROCESS
=
'sshguard'
block_time
=
'123'
detect_time
=
'1804'
port
=
'22'
threshold
=
'10'
allow_list
=
[
'192.0.2.0/24'
,
'2001:db8::/48'
]
self
.
cli_set
(
base_path
+
[
'dynamic-protection'
,
'block-time'
,
block_time
])
self
.
cli_set
(
base_path
+
[
'dynamic-protection'
,
'detect-time'
,
detect_time
])
self
.
cli_set
(
base_path
+
[
'dynamic-protection'
,
'threshold'
,
threshold
])
for
allow
in
allow_list
:
self
.
cli_set
(
base_path
+
[
'dynamic-protection'
,
'allow-from'
,
allow
])
# commit changes
self
.
cli_commit
()
# Check configured port
tmp
=
get_config_value
(
'Port'
)
self
.
assertIn
(
port
,
tmp
)
# Check sshgurad service
self
.
assertTrue
(
process_named_running
(
SSHGUARD_PROCESS
))
sshguard_lines
=
[
f
'THRESHOLD={threshold}'
,
f
'BLOCK_TIME={block_time}'
,
f
'DETECTION_TIME={detect_time}'
]
tmp_sshguard_conf
=
read_file
(
SSHGUARD_CONFIG
)
for
line
in
sshguard_lines
:
self
.
assertIn
(
line
,
tmp_sshguard_conf
)
tmp_whitelist_conf
=
read_file
(
SSHGUARD_WHITELIST
)
for
allow
in
allow_list
:
self
.
assertIn
(
allow
,
tmp_whitelist_conf
)
# Delete service ssh dynamic-protection
# but not service ssh itself
self
.
cli_delete
(
base_path
+
[
'dynamic-protection'
])
self
.
cli_commit
()
self
.
assertFalse
(
process_named_running
(
SSHGUARD_PROCESS
))
# Network Device Collaborative Protection Profile
def
test_ssh_ndcpp
(
self
):
ciphers
=
[
'aes128-cbc'
,
'aes128-ctr'
,
'aes256-cbc'
,
'aes256-ctr'
]
host_key_algs
=
[
'sk-ssh-ed25519@openssh.com'
,
'ssh-rsa'
,
'ssh-ed25519'
]
kexes
=
[
'diffie-hellman-group14-sha1'
,
'ecdh-sha2-nistp256'
,
'ecdh-sha2-nistp384'
,
'ecdh-sha2-nistp521'
]
macs
=
[
'hmac-sha1'
,
'hmac-sha2-256'
,
'hmac-sha2-512'
]
rekey_time
=
'60'
rekey_data
=
'1024'
for
cipher
in
ciphers
:
self
.
cli_set
(
base_path
+
[
'ciphers'
,
cipher
])
for
host_key
in
host_key_algs
:
self
.
cli_set
(
base_path
+
[
'hostkey-algorithm'
,
host_key
])
for
kex
in
kexes
:
self
.
cli_set
(
base_path
+
[
'key-exchange'
,
kex
])
for
mac
in
macs
:
self
.
cli_set
(
base_path
+
[
'mac'
,
mac
])
# Optional rekey parameters
self
.
cli_set
(
base_path
+
[
'rekey'
,
'data'
,
rekey_data
])
self
.
cli_set
(
base_path
+
[
'rekey'
,
'time'
,
rekey_time
])
# commit changes
self
.
cli_commit
()
ssh_lines
=
[
'Ciphers aes128-cbc,aes128-ctr,aes256-cbc,aes256-ctr'
,
'HostKeyAlgorithms sk-ssh-ed25519@openssh.com,ssh-rsa,ssh-ed25519'
,
'MACs hmac-sha1,hmac-sha2-256,hmac-sha2-512'
,
'KexAlgorithms diffie-hellman-group14-sha1,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521'
,
'RekeyLimit 1024M 60M'
]
tmp_sshd_conf
=
read_file
(
SSHD_CONF
)
for
line
in
ssh_lines
:
self
.
assertIn
(
line
,
tmp_sshd_conf
)
if
__name__
==
'__main__'
:
unittest
.
main
(
verbosity
=
2
)
File Metadata
Details
Attached
Mime Type
text/x-script.python
Expires
Mon, Dec 15, 9:09 PM (1 d, 1 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3071864
Default Alt Text
test_service_ssh.py (10 KB)
Attached To
Mode
rVYOSONEX vyos-1x
Attached
Detach File
Event Timeline
Log In to Comment