Page MenuHomeVyOS Platform

No OneTemporary

Size
24 KB
Referenced Files
None
Subscribers
None
diff --git a/python/vyos/ifconfig/macsec.py b/python/vyos/ifconfig/macsec.py
index bde1d9aec..383905814 100644
--- a/python/vyos/ifconfig/macsec.py
+++ b/python/vyos/ifconfig/macsec.py
@@ -1,74 +1,74 @@
# Copyright 2020-2023 VyOS maintainers and contributors <maintainers@vyos.io>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
from vyos.ifconfig.interface import Interface
@Interface.register
class MACsecIf(Interface):
"""
MACsec is an IEEE standard (IEEE 802.1AE) for MAC security, introduced in
2006. It defines a way to establish a protocol independent connection
between two hosts with data confidentiality, authenticity and/or integrity,
using GCM-AES-128. MACsec operates on the Ethernet layer and as such is a
layer 2 protocol, which means it's designed to secure traffic within a
layer 2 network, including DHCP or ARP requests. It does not compete with
other security solutions such as IPsec (layer 3) or TLS (layer 4), as all
those solutions are used for their own specific use cases.
"""
iftype = 'macsec'
definition = {
**Interface.definition,
**{
'section': 'macsec',
'prefixes': ['macsec', ],
},
}
def _create(self):
"""
Create MACsec interface in OS kernel. Interface is administrative
down by default.
"""
# create tunnel interface
cmd = 'ip link add link {source_interface} {ifname} type {type}'.format(**self.config)
cmd += f' cipher {self.config["security"]["cipher"]}'
if 'encrypt' in self.config["security"]:
cmd += ' encrypt on'
self._cmd(cmd)
# Check if using static keys
if 'static' in self.config["security"]:
# Set static TX key
cmd = 'ip macsec add {ifname} tx sa 0 pn 1 on key 00'.format(**self.config)
cmd += f' {self.config["security"]["static"]["key"]}'
self._cmd(cmd)
for peer, peer_config in self.config["security"]["static"]["peer"].items():
if 'disable' in peer_config:
continue
# Create the address
cmd = 'ip macsec add {ifname} rx port 1 address'.format(**self.config)
cmd += f' {peer_config["mac"]}'
self._cmd(cmd)
- # Add the rx-key to the address
+ # Add the encryption key to the address
cmd += f' sa 0 pn 1 on key 01 {peer_config["key"]}'
self._cmd(cmd)
# interface is always A/D down. It needs to be enabled explicitly
self.set_admin_state('down')
diff --git a/smoketest/scripts/cli/test_interfaces_macsec.py b/smoketest/scripts/cli/test_interfaces_macsec.py
index a4e6840ca..d73895b7f 100755
--- a/smoketest/scripts/cli/test_interfaces_macsec.py
+++ b/smoketest/scripts/cli/test_interfaces_macsec.py
@@ -1,272 +1,272 @@
#!/usr/bin/env python3
#
# Copyright (C) 2020-2024 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
import unittest
from base_interfaces_test import BasicInterfaceTest
from vyos.configsession import ConfigSessionError
from vyos.ifconfig import Section
from vyos.utils.file import read_file
from vyos.utils.network import get_interface_config
from vyos.utils.network import interface_exists
from vyos.utils.process import process_named_running
PROCESS_NAME = 'wpa_supplicant'
def get_config_value(interface, key):
tmp = read_file(f'/run/wpa_supplicant/{interface}.conf')
tmp = re.findall(r'\n?{}=(.*)'.format(key), tmp)
return tmp[0]
class MACsecInterfaceTest(BasicInterfaceTest.TestCase):
@classmethod
def setUpClass(cls):
cls._base_path = ['interfaces', 'macsec']
cls._options = { 'macsec0': ['source-interface eth0', 'security cipher gcm-aes-128'] }
# if we have a physical eth1 interface, add a second macsec instance
if 'eth1' in Section.interfaces('ethernet'):
macsec = { 'macsec1': [f'source-interface eth1', 'security cipher gcm-aes-128'] }
cls._options.update(macsec)
cls._interfaces = list(cls._options)
# call base-classes classmethod
super(MACsecInterfaceTest, cls).setUpClass()
def tearDown(self):
super().tearDown()
self.assertFalse(process_named_running(PROCESS_NAME))
def test_macsec_encryption(self):
# MACsec can be operating in authentication and encryption mode - both
# using different mandatory settings, lets test encryption as the basic
# authentication test has been performed using the base class tests
mak_cak = '232e44b7fda6f8e2d88a07bf78a7aff4'
mak_ckn = '40916f4b23e3d548ad27eedd2d10c6f98c2d21684699647d63d41b500dfe8836'
replay_window = '64'
for interface, option_value in self._options.items():
for option in option_value:
if option.split()[0] == 'source-interface':
src_interface = option.split()[1]
self.cli_set(self._base_path + [interface] + option.split())
# Encrypt link
self.cli_set(self._base_path + [interface, 'security', 'encrypt'])
# check validate() - Physical source interface MTU must be higher then our MTU
self.cli_set(self._base_path + [interface, 'mtu', '1500'])
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_delete(self._base_path + [interface, 'mtu'])
# check validate() - MACsec security keys mandartory when encryption is enabled
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_set(self._base_path + [interface, 'security', 'mka', 'cak', mak_cak])
# check validate() - MACsec security keys mandartory when encryption is enabled
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_set(self._base_path + [interface, 'security', 'mka', 'ckn', mak_ckn])
self.cli_set(self._base_path + [interface, 'security', 'replay-window', replay_window])
# final commit of settings
self.cli_commit()
tmp = get_config_value(src_interface, 'macsec_integ_only')
self.assertIn("0", tmp)
tmp = get_config_value(src_interface, 'mka_cak')
self.assertIn(mak_cak, tmp)
tmp = get_config_value(src_interface, 'mka_ckn')
self.assertIn(mak_ckn, tmp)
# check that the default priority of 255 is programmed
tmp = get_config_value(src_interface, 'mka_priority')
self.assertIn("255", tmp)
tmp = get_config_value(src_interface, 'macsec_replay_window')
self.assertIn(replay_window, tmp)
tmp = read_file(f'/sys/class/net/{interface}/mtu')
self.assertEqual(tmp, '1460')
# Encryption enabled?
tmp = get_interface_config(interface)
self.assertTrue(tmp['linkinfo']['info_data']['encrypt'])
# Check for running process
self.assertTrue(process_named_running(PROCESS_NAME))
def test_macsec_gcm_aes_128(self):
src_interface = 'eth0'
interface = 'macsec1'
cipher = 'gcm-aes-128'
self.cli_set(self._base_path + [interface])
# check validate() - source interface is mandatory
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_set(self._base_path + [interface, 'source-interface', src_interface])
# check validate() - cipher is mandatory
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_set(self._base_path + [interface, 'security', 'cipher', cipher])
# final commit and verify
self.cli_commit()
self.assertTrue(interface_exists(interface))
# Verify proper cipher suite (T4537)
tmp = get_interface_config(interface)
self.assertEqual(cipher, tmp['linkinfo']['info_data']['cipher_suite'].lower())
def test_macsec_gcm_aes_256(self):
src_interface = 'eth0'
interface = 'macsec4'
cipher = 'gcm-aes-256'
self.cli_set(self._base_path + [interface])
# check validate() - source interface is mandatory
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_set(self._base_path + [interface, 'source-interface', src_interface])
# check validate() - cipher is mandatory
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_set(self._base_path + [interface, 'security', 'cipher', cipher])
# final commit and verify
self.cli_commit()
self.assertTrue(interface_exists(interface))
# Verify proper cipher suite (T4537)
tmp = get_interface_config(interface)
self.assertEqual(cipher, tmp['linkinfo']['info_data']['cipher_suite'].lower())
def test_macsec_source_interface(self):
# Ensure source-interface can bot be part of any other bond or bridge
base_bridge = ['interfaces', 'bridge', 'br200']
base_bond = ['interfaces', 'bonding', 'bond200']
for interface, option_value in self._options.items():
for option in option_value:
self.cli_set(self._base_path + [interface] + option.split())
if option.split()[0] == 'source-interface':
src_interface = option.split()[1]
self.cli_set(base_bridge + ['member', 'interface', src_interface])
# check validate() - Source interface must not already be a member of a bridge
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_delete(base_bridge)
self.cli_set(base_bond + ['member', 'interface', src_interface])
# check validate() - Source interface must not already be a member of a bridge
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_delete(base_bond)
# final commit and verify
self.cli_commit()
self.assertTrue(interface_exists(interface))
def test_macsec_static_keys(self):
src_interface = 'eth0'
interface = 'macsec5'
cipher1 = 'gcm-aes-128'
cipher2 = 'gcm-aes-256'
tx_key_1 = '71a82a48eddfa12c08a19792ca20c4bb'
tx_key_2 = 'dd487b2958e855ea35a5d43a5ecb3dcfbe7889ffcb877770252feb13b734478d'
rx_key_1 = '0022d00f57e75241a230cdf7118dfcc5'
rx_key_2 = 'b7d6d7ad075e02323fdeb845217b884d3f93ff36b2cdaf6b07eeb189b877245f'
peer_mac = '00:11:22:33:44:55'
self.cli_set(self._base_path + [interface])
# Encrypt link
self.cli_set(self._base_path + [interface, 'security', 'encrypt'])
# check validate() - source interface is mandatory
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_set(self._base_path + [interface, 'source-interface', src_interface])
# check validate() - cipher is mandatory
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_set(self._base_path + [interface, 'security', 'cipher', cipher1])
# check validate() - only static or mka config is allowed
self.cli_set(self._base_path + [interface, 'security', 'static'])
self.cli_set(self._base_path + [interface, 'security', 'mka', 'cak', tx_key_1])
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_delete(self._base_path + [interface, 'security', 'mka'])
- # check validate() - tx-key required
+ # check validate() - key required
with self.assertRaises(ConfigSessionError):
self.cli_commit()
- # check validate() - tx-key length must match cipher
+ # check validate() - key length must match cipher
self.cli_set(self._base_path + [interface, 'security', 'static', 'key', tx_key_2])
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_set(self._base_path + [interface, 'security', 'static', 'key', tx_key_1])
# check validate() - at least one peer must be defined
with self.assertRaises(ConfigSessionError):
self.cli_commit()
- # check validate() - enabled peer must have both rx-key and MAC defined
+ # check validate() - enabled peer must have both key and MAC defined
self.cli_set(self._base_path + [interface, 'security', 'static', 'peer', 'TESTPEER'])
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_set(self._base_path + [interface, 'security', 'static', 'peer', 'TESTPEER', 'mac', peer_mac])
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_delete(self._base_path + [interface, 'security', 'static', 'peer', 'TESTPEER', 'mac'])
self.cli_set(self._base_path + [interface, 'security', 'static', 'peer', 'TESTPEER', 'key', rx_key_1])
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_set(self._base_path + [interface, 'security', 'static', 'peer', 'TESTPEER', 'mac', peer_mac])
- # check validate() - peer rx-key length must match cipher
+ # check validate() - peer key length must match cipher
self.cli_set(self._base_path + [interface, 'security', 'cipher', cipher2])
self.cli_set(self._base_path + [interface, 'security', 'static', 'key', tx_key_2])
with self.assertRaises(ConfigSessionError):
self.cli_commit()
self.cli_set(self._base_path + [interface, 'security', 'static', 'peer', 'TESTPEER', 'key', rx_key_2])
# final commit and verify
self.cli_commit()
self.assertTrue(interface_exists(interface))
tmp = get_interface_config(interface)
self.assertEqual(cipher2, tmp['linkinfo']['info_data']['cipher_suite'].lower())
# Encryption enabled?
self.assertTrue(tmp['linkinfo']['info_data']['encrypt'])
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/src/conf_mode/interfaces_macsec.py b/src/conf_mode/interfaces_macsec.py
index eb0ca9a8b..3ede4377a 100755
--- a/src/conf_mode/interfaces_macsec.py
+++ b/src/conf_mode/interfaces_macsec.py
@@ -1,207 +1,207 @@
#!/usr/bin/env python3
#
# Copyright (C) 2020-2024 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
from sys import exit
from vyos.config import Config
from vyos.configdict import get_interface_dict
from vyos.configdict import is_node_changed
from vyos.configdict import is_source_interface
from vyos.configverify import verify_vrf
from vyos.configverify import verify_address
from vyos.configverify import verify_bridge_delete
from vyos.configverify import verify_mtu_ipv6
from vyos.configverify import verify_mirror_redirect
from vyos.configverify import verify_source_interface
from vyos.configverify import verify_bond_bridge_member
from vyos.ifconfig import MACsecIf
from vyos.ifconfig import Interface
from vyos.template import render
from vyos.utils.process import call
from vyos.utils.dict import dict_search
from vyos.utils.network import interface_exists
from vyos.utils.process import is_systemd_service_running
from vyos import ConfigError
from vyos import airbag
airbag.enable()
# XXX: wpa_supplicant works on the source interface
wpa_suppl_conf = '/run/wpa_supplicant/{source_interface}.conf'
# Constants
## gcm-aes-128 requires a 128bit long key - 32 characters (string) = 16byte = 128bit
GCM_AES_128_LEN: int = 32
GCM_128_KEY_ERROR = 'gcm-aes-128 requires a 128bit long key!'
## gcm-aes-256 requires a 256bit long key - 64 characters (string) = 32byte = 256bit
GCM_AES_256_LEN: int = 64
GCM_256_KEY_ERROR = 'gcm-aes-256 requires a 256bit long key!'
def get_config(config=None):
"""
Retrive CLI config as dictionary. Dictionary can never be empty, as at least the
interface name will be added or a deleted flag
"""
if config:
conf = config
else:
conf = Config()
base = ['interfaces', 'macsec']
ifname, macsec = get_interface_dict(conf, base)
# Check if interface has been removed
if 'deleted' in macsec:
source_interface = conf.return_effective_value(base + [ifname, 'source-interface'])
macsec.update({'source_interface': source_interface})
if is_node_changed(conf, base + [ifname, 'security']):
macsec.update({'shutdown_required': {}})
if is_node_changed(conf, base + [ifname, 'source_interface']):
macsec.update({'shutdown_required': {}})
if 'source_interface' in macsec:
tmp = is_source_interface(conf, macsec['source_interface'], ['macsec', 'pseudo-ethernet'])
if tmp and tmp != ifname: macsec.update({'is_source_interface' : tmp})
return macsec
def verify(macsec):
if 'deleted' in macsec:
verify_bridge_delete(macsec)
return None
verify_source_interface(macsec)
verify_vrf(macsec)
verify_mtu_ipv6(macsec)
verify_address(macsec)
verify_bond_bridge_member(macsec)
verify_mirror_redirect(macsec)
if dict_search('security.cipher', macsec) == None:
raise ConfigError('Cipher suite must be set for MACsec "{ifname}"'.format(**macsec))
if dict_search('security.encrypt', macsec) != None:
# Check that only static or MKA config is present
if dict_search('security.static', macsec) != None and (dict_search('security.mka.cak', macsec) != None or dict_search('security.mka.ckn', macsec) != None):
raise ConfigError('Only static or MKA can be used!')
# Logic to check static configuration
if dict_search('security.static', macsec) != None:
- # tx-key must be defined
+ # key must be defined
if dict_search('security.static.key', macsec) == None:
- raise ConfigError('Static MACsec tx-key must be defined.')
+ raise ConfigError('Static MACsec key must be defined.')
tx_len = len(dict_search('security.static.key', macsec))
if dict_search('security.cipher', macsec) == 'gcm-aes-128' and tx_len != GCM_AES_128_LEN:
raise ConfigError(GCM_128_KEY_ERROR)
if dict_search('security.cipher', macsec) == 'gcm-aes-256' and tx_len != GCM_AES_256_LEN:
raise ConfigError(GCM_256_KEY_ERROR)
# Make sure at least one peer is defined
if 'peer' not in macsec['security']['static']:
raise ConfigError('Must have at least one peer defined for static MACsec')
- # For every enabled peer, make sure a MAC and rx-key is defined
+ # For every enabled peer, make sure a MAC and key is defined
for peer, peer_config in macsec['security']['static']['peer'].items():
if 'disable' not in peer_config and ('mac' not in peer_config or 'key' not in peer_config):
- raise ConfigError('Every enabled MACsec static peer must have a MAC address and rx-key defined.')
+ raise ConfigError('Every enabled MACsec static peer must have a MAC address and key defined!')
- # check rx-key length against cipher suite
+ # check key length against cipher suite
rx_len = len(peer_config['key'])
if dict_search('security.cipher', macsec) == 'gcm-aes-128' and rx_len != GCM_AES_128_LEN:
raise ConfigError(GCM_128_KEY_ERROR)
if dict_search('security.cipher', macsec) == 'gcm-aes-256' and rx_len != GCM_AES_256_LEN:
raise ConfigError(GCM_256_KEY_ERROR)
# Logic to check MKA configuration
else:
if dict_search('security.mka.cak', macsec) == None or dict_search('security.mka.ckn', macsec) == None:
raise ConfigError('Missing mandatory MACsec security keys as encryption is enabled!')
cak_len = len(dict_search('security.mka.cak', macsec))
if dict_search('security.cipher', macsec) == 'gcm-aes-128' and cak_len != GCM_AES_128_LEN:
raise ConfigError(GCM_128_KEY_ERROR)
elif dict_search('security.cipher', macsec) == 'gcm-aes-256' and cak_len != GCM_AES_256_LEN:
raise ConfigError(GCM_256_KEY_ERROR)
if 'source_interface' in macsec:
# MACsec adds a 40 byte overhead (32 byte MACsec + 8 bytes VLAN 802.1ad
# and 802.1q) - we need to check the underlaying MTU if our configured
# MTU is at least 40 bytes less then the MTU of our physical interface.
lower_mtu = Interface(macsec['source_interface']).get_mtu()
if lower_mtu < (int(macsec['mtu']) + 40):
raise ConfigError('MACsec overhead does not fit into underlaying device MTU,\n' \
f'{lower_mtu} bytes is too small!')
return None
def generate(macsec):
# Only generate wpa_supplicant config if using MKA
if dict_search('security.mka.cak', macsec):
render(wpa_suppl_conf.format(**macsec), 'macsec/wpa_supplicant.conf.j2', macsec)
return None
def apply(macsec):
systemd_service = 'wpa_supplicant-macsec@{source_interface}'.format(**macsec)
# Remove macsec interface on deletion or mandatory parameter change
if 'deleted' in macsec or 'shutdown_required' in macsec:
call(f'systemctl stop {systemd_service}')
if interface_exists(macsec['ifname']):
tmp = MACsecIf(**macsec)
tmp.remove()
if 'deleted' in macsec:
# delete configuration on interface removal
if os.path.isfile(wpa_suppl_conf.format(**macsec)):
os.unlink(wpa_suppl_conf.format(**macsec))
return None
# It is safe to "re-create" the interface always, there is a sanity
# check that the interface will only be create if its non existent
i = MACsecIf(**macsec)
i.update(macsec)
# Only reload/restart if using MKA
if dict_search('security.mka.cak', macsec):
if not is_systemd_service_running(systemd_service) or 'shutdown_required' in macsec:
call(f'systemctl reload-or-restart {systemd_service}')
return None
if __name__ == '__main__':
try:
c = get_config()
verify(c)
generate(c)
apply(c)
except ConfigError as e:
print(e)
exit(1)

File Metadata

Mime Type
text/x-diff
Expires
Tue, Dec 9, 10:51 PM (1 d, 7 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3110925
Default Alt Text
(24 KB)

Event Timeline