diff --git a/interface-definitions/include/pki/openssh-key.xml.i b/interface-definitions/include/pki/openssh-key.xml.i
new file mode 100644
index 000000000..8f005d077
--- /dev/null
+++ b/interface-definitions/include/pki/openssh-key.xml.i
@@ -0,0 +1,14 @@
+<!-- include start from pki/openssh-key.xml.i -->
+<leafNode name="key">
+  <properties>
+    <help>OpenSSH key in PKI configuration</help>
+    <completionHelp>
+      <path>pki openssh</path>
+    </completionHelp>
+    <valueHelp>
+      <format>txt</format>
+      <description>Name of OpenSSH key in PKI configuration</description>
+    </valueHelp>
+  </properties>
+</leafNode>
+<!-- include end -->
diff --git a/interface-definitions/pki.xml.in b/interface-definitions/pki.xml.in
index 617bdd584..7a0b073b4 100644
--- a/interface-definitions/pki.xml.in
+++ b/interface-definitions/pki.xml.in
@@ -1,248 +1,287 @@
 <?xml version="1.0"?>
 <interfaceDefinition>
   <node name="pki" owner="${vyos_conf_scripts_dir}/pki.py">
     <properties>
       <help>VyOS PKI configuration</help>
       <priority>300</priority>
     </properties>
     <children>
       <tagNode name="ca">
         <properties>
           <help>Certificate Authority</help>
           <constraint>
             #include <include/constraint/alpha-numeric-hyphen-underscore-dot.xml.i>
           </constraint>
         </properties>
         <children>
           #include <include/pki/cli-certificate-base64.xml.i>
           #include <include/generic-description.xml.i>
           <node name="private">
             <properties>
               <help>CA private key in PEM format</help>
             </properties>
             <children>
               #include <include/pki/cli-private-key-base64.xml.i>
               #include <include/pki/password-protected.xml.i>
             </children>
           </node>
           <leafNode name="crl">
             <properties>
               <help>Certificate revocation list in PEM format</help>
               <constraint>
                 <validator name="base64"/>
               </constraint>
               <constraintErrorMessage>CRL is not base64-encoded</constraintErrorMessage>
               <multi/>
             </properties>
           </leafNode>
           #include <include/pki/cli-revoke.xml.i>
         </children>
       </tagNode>
       <tagNode name="certificate">
         <properties>
           <help>Certificate</help>
           <constraint>
             #include <include/constraint/alpha-numeric-hyphen-underscore-dot.xml.i>
           </constraint>
         </properties>
         <children>
           #include <include/pki/cli-certificate-base64.xml.i>
           <node name="acme">
             <properties>
               <help>Automatic Certificate Management Environment (ACME) request</help>
             </properties>
             <children>
               #include <include/url-http-https.xml.i>
               <leafNode name="url">
                 <defaultValue>https://acme-v02.api.letsencrypt.org/directory</defaultValue>
               </leafNode>
               <leafNode name="domain-name">
                 <properties>
                   <help>Domain Name</help>
                   <constraint>
                     <validator name="fqdn"/>
                   </constraint>
                   <constraintErrorMessage>Invalid domain name (RFC 1123 section 2).\nMay only contain letters, numbers and .-_</constraintErrorMessage>
                   <multi/>
                 </properties>
               </leafNode>
               <leafNode name="email">
                 <properties>
                   <help>Email address to associate with certificate</help>
                   <constraint>
                     #include <include/constraint/email.xml.i>
                   </constraint>
                 </properties>
               </leafNode>
               #include <include/listen-address-ipv4-single.xml.i>
               <leafNode name="rsa-key-size">
                 <properties>
                   <help>Size of the RSA key</help>
                   <completionHelp>
                     <list>2048 3072 4096</list>
                   </completionHelp>
                   <valueHelp>
                     <format>2048</format>
                     <description>RSA key length 2048 bit</description>
                   </valueHelp>
                   <valueHelp>
                     <format>3072</format>
                     <description>RSA key length 3072 bit</description>
                   </valueHelp>
                   <valueHelp>
                     <format>4096</format>
                     <description>RSA key length 4096 bit</description>
                   </valueHelp>
                   <constraint>
                     <regex>(2048|3072|4096)</regex>
                   </constraint>
                 </properties>
                 <defaultValue>2048</defaultValue>
               </leafNode>
             </children>
           </node>
           #include <include/generic-description.xml.i>
           <node name="private">
             <properties>
               <help>Certificate private key</help>
             </properties>
             <children>
               #include <include/pki/cli-private-key-base64.xml.i>
               #include <include/pki/password-protected.xml.i>
             </children>
           </node>
           #include <include/pki/cli-revoke.xml.i>
         </children>
       </tagNode>
       <tagNode name="dh">
         <properties>
           <help>Diffie-Hellman parameters</help>
           <constraint>
             #include <include/constraint/alpha-numeric-hyphen-underscore-dot.xml.i>
           </constraint>
         </properties>
         <children>
           <leafNode name="parameters">
             <properties>
               <help>DH parameters in PEM format</help>
               <constraint>
                 <validator name="base64"/>
               </constraint>
               <constraintErrorMessage>DH parameters are not base64-encoded</constraintErrorMessage>
             </properties>
           </leafNode>
         </children>
       </tagNode>
       <tagNode name="key-pair">
         <properties>
           <help>Public and private keys</help>
         </properties>
         <children>
           <node name="public">
             <properties>
               <help>Public key</help>
             </properties>
             <children>
               #include <include/pki/cli-public-key-base64.xml.i>
             </children>
           </node>
           <node name="private">
             <properties>
               <help>Private key</help>
             </properties>
             <children>
               #include <include/pki/cli-private-key-base64.xml.i>
               #include <include/pki/password-protected.xml.i>
             </children>
           </node>
         </children>
       </tagNode>
+      <tagNode name="openssh">
+        <properties>
+          <help>OpenSSH public and private keys</help>
+        </properties>
+        <children>
+          <node name="public">
+            <properties>
+              <help>Public key</help>
+            </properties>
+            <children>
+              #include <include/pki/cli-public-key-base64.xml.i>
+              <leafNode name="type">
+                <properties>
+                  <help>SSH public key type</help>
+                  <completionHelp>
+                    <list>ssh-rsa</list>
+                  </completionHelp>
+                  <valueHelp>
+                    <format>ssh-rsa</format>
+                    <description>Key pair based on RSA algorithm</description>
+                  </valueHelp>
+                  <constraint>
+                    <regex>(ssh-rsa)</regex>
+                  </constraint>
+                </properties>
+              </leafNode>
+            </children>
+          </node>
+          <node name="private">
+            <properties>
+              <help>Private key</help>
+            </properties>
+            <children>
+              #include <include/pki/cli-private-key-base64.xml.i>
+              #include <include/pki/password-protected.xml.i>
+            </children>
+          </node>
+        </children>
+      </tagNode>
       <tagNode name="openssh">
         <properties>
           <help>OpenSSH public and private keys</help>
         </properties>
         <children>
           <node name="public">
             <properties>
               <help>Public key</help>
             </properties>
             <children>
               #include <include/pki/cli-public-key-base64.xml.i>
             </children>
           </node>
           <node name="private">
             <properties>
               <help>Private key</help>
             </properties>
             <children>
               #include <include/pki/cli-private-key-base64.xml.i>
               #include <include/pki/password-protected.xml.i>
             </children>
           </node>
         </children>
       </tagNode>
       <node name="openvpn">
         <properties>
           <help>OpenVPN keys</help>
         </properties>
         <children>
           <tagNode name="shared-secret">
             <properties>
               <help>OpenVPN shared secret key</help>
             </properties>
             <children>
               <leafNode name="key">
                 <properties>
                   <help>OpenVPN shared secret key data</help>
                 </properties>
               </leafNode>
               <leafNode name="version">
                 <properties>
                   <help>OpenVPN shared secret key version</help>
                 </properties>
               </leafNode>
             </children>
           </tagNode>
         </children>
       </node>
       <node name="x509">
         <properties>
           <help>X509 Settings</help>
         </properties>
         <children>
           <node name="default">
             <properties>
               <help>X509 Default Values</help>
             </properties>
             <children>
               <leafNode name="country">
                 <properties>
                   <help>Default country</help>
                 </properties>
                 <defaultValue>GB</defaultValue>
               </leafNode>
               <leafNode name="state">
                 <properties>
                   <help>Default state</help>
                 </properties>
                 <defaultValue>Some-State</defaultValue>
               </leafNode>
               <leafNode name="locality">
                 <properties>
                   <help>Default locality</help>
                 </properties>
                 <defaultValue>Some-City</defaultValue>
               </leafNode>
               <leafNode name="organization">
                 <properties>
                   <help>Default organization</help>
                 </properties>
                 <defaultValue>VyOS</defaultValue>
               </leafNode>
             </children>
           </node>
         </children>
       </node>
     </children>
   </node>
 </interfaceDefinition>
diff --git a/python/vyos/pki.py b/python/vyos/pki.py
index 792e24b76..02dece471 100644
--- a/python/vyos/pki.py
+++ b/python/vyos/pki.py
@@ -1,432 +1,455 @@
 #!/usr/bin/env python3
 #
-# Copyright (C) 2023 VyOS maintainers and contributors
+# Copyright (C) 2023-2024 VyOS maintainers and contributors
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License version 2 or later as
 # published by the Free Software Foundation.
 #
 # This program is distributed in the hope that it will be useful,
 # but WITHOUT ANY WARRANTY; without even the implied warranty of
 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 # GNU General Public License for more details.
 #
 # You should have received a copy of the GNU General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 import datetime
 import ipaddress
 
 from cryptography import x509
 from cryptography.exceptions import InvalidSignature
 from cryptography.x509.extensions import ExtensionNotFound
-from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID, ExtensionOID
+from cryptography.x509.oid import NameOID
+from cryptography.x509.oid import ExtendedKeyUsageOID
+from cryptography.x509.oid import ExtensionOID
 from cryptography.hazmat.primitives import hashes
 from cryptography.hazmat.primitives import serialization
 from cryptography.hazmat.primitives.asymmetric import dh
 from cryptography.hazmat.primitives.asymmetric import dsa
 from cryptography.hazmat.primitives.asymmetric import ec
 from cryptography.hazmat.primitives.asymmetric import padding
 from cryptography.hazmat.primitives.asymmetric import rsa
 
 CERT_BEGIN='-----BEGIN CERTIFICATE-----\n'
 CERT_END='\n-----END CERTIFICATE-----'
 KEY_BEGIN='-----BEGIN PRIVATE KEY-----\n'
 KEY_END='\n-----END PRIVATE KEY-----'
 KEY_ENC_BEGIN='-----BEGIN ENCRYPTED PRIVATE KEY-----\n'
 KEY_ENC_END='\n-----END ENCRYPTED PRIVATE KEY-----'
 KEY_PUB_BEGIN='-----BEGIN PUBLIC KEY-----\n'
 KEY_PUB_END='\n-----END PUBLIC KEY-----'
 CRL_BEGIN='-----BEGIN X509 CRL-----\n'
 CRL_END='\n-----END X509 CRL-----'
 CSR_BEGIN='-----BEGIN CERTIFICATE REQUEST-----\n'
 CSR_END='\n-----END CERTIFICATE REQUEST-----'
 DH_BEGIN='-----BEGIN DH PARAMETERS-----\n'
 DH_END='\n-----END DH PARAMETERS-----'
 OVPN_BEGIN = '-----BEGIN OpenVPN Static key V{0}-----\n'
 OVPN_END = '\n-----END OpenVPN Static key V{0}-----'
+OPENSSH_KEY_BEGIN='-----BEGIN OPENSSH PRIVATE KEY-----\n'
+OPENSSH_KEY_END='\n-----END OPENSSH PRIVATE KEY-----'
 
 # Print functions
 
 encoding_map = {
     'PEM': serialization.Encoding.PEM,
     'OpenSSH': serialization.Encoding.OpenSSH
 }
 
 public_format_map = {
     'SubjectPublicKeyInfo': serialization.PublicFormat.SubjectPublicKeyInfo,
     'OpenSSH': serialization.PublicFormat.OpenSSH
 }
 
 private_format_map = {
     'PKCS8': serialization.PrivateFormat.PKCS8,
     'OpenSSH': serialization.PrivateFormat.OpenSSH
 }
 
 hash_map = {
     'sha256': hashes.SHA256,
     'sha384': hashes.SHA384,
     'sha512': hashes.SHA512,
 }
 
 def get_certificate_fingerprint(cert, hash):
     hash_algorithm = hash_map[hash]()
     fp = cert.fingerprint(hash_algorithm)
 
     return fp.hex(':').upper()
 
 def encode_certificate(cert):
     return cert.public_bytes(encoding=serialization.Encoding.PEM).decode('utf-8')
 
 def encode_public_key(cert, encoding='PEM', key_format='SubjectPublicKeyInfo'):
     if encoding not in encoding_map:
         encoding = 'PEM'
     if key_format not in public_format_map:
         key_format = 'SubjectPublicKeyInfo'
     return cert.public_bytes(
         encoding=encoding_map[encoding],
         format=public_format_map[key_format]).decode('utf-8')
 
 def encode_private_key(private_key, encoding='PEM', key_format='PKCS8', passphrase=None):
     if encoding not in encoding_map:
         encoding = 'PEM'
     if key_format not in private_format_map:
         key_format = 'PKCS8'
     encryption = serialization.NoEncryption() if not passphrase else serialization.BestAvailableEncryption(bytes(passphrase, 'utf-8'))
     return private_key.private_bytes(
         encoding=encoding_map[encoding],
         format=private_format_map[key_format],
         encryption_algorithm=encryption).decode('utf-8')
 
 def encode_dh_parameters(dh_parameters):
     return dh_parameters.parameter_bytes(
         encoding=serialization.Encoding.PEM,
         format=serialization.ParameterFormat.PKCS3).decode('utf-8')
 
 # EC Helper
 
 def get_elliptic_curve(size):
     curve_func = None
     name = f'SECP{size}R1'
     if hasattr(ec, name):
         curve_func = getattr(ec, name)
     else:
         curve_func = ec.SECP256R1() # Default to SECP256R1
     return curve_func()
 
 # Creation functions
 
 def create_private_key(key_type, key_size=None):
     private_key = None
     if key_type == 'rsa':
         private_key = rsa.generate_private_key(public_exponent=65537, key_size=key_size)
     elif key_type == 'dsa':
         private_key = dsa.generate_private_key(key_size=key_size)
     elif key_type == 'ec':
         curve = get_elliptic_curve(key_size)
         private_key = ec.generate_private_key(curve)
     return private_key
 
 def create_certificate_request(subject, private_key, subject_alt_names=[]):
     subject_obj = x509.Name([
         x509.NameAttribute(NameOID.COUNTRY_NAME, subject['country']),
         x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, subject['state']),
         x509.NameAttribute(NameOID.LOCALITY_NAME, subject['locality']),
         x509.NameAttribute(NameOID.ORGANIZATION_NAME, subject['organization']),
         x509.NameAttribute(NameOID.COMMON_NAME, subject['common_name'])])
 
     builder = x509.CertificateSigningRequestBuilder() \
         .subject_name(subject_obj)
 
     if subject_alt_names:
         alt_names = []
         for obj in subject_alt_names:
             if isinstance(obj, ipaddress.IPv4Address) or isinstance(obj, ipaddress.IPv6Address):
                 alt_names.append(x509.IPAddress(obj))
             elif isinstance(obj, str):
                 alt_names.append(x509.DNSName(obj))
         if alt_names:
             builder = builder.add_extension(x509.SubjectAlternativeName(alt_names), critical=False)
 
     return builder.sign(private_key, hashes.SHA256())
 
 def add_key_identifier(ca_cert):
     try:
         ski_ext = ca_cert.extensions.get_extension_for_class(x509.SubjectKeyIdentifier)
         return x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(ski_ext.value)
     except:
         return x509.AuthorityKeyIdentifier.from_issuer_public_key(ca_cert.public_key())
 
 def create_certificate(cert_req, ca_cert, ca_private_key, valid_days=365, cert_type='server', is_ca=False, is_sub_ca=False):
     ext_key_usage = []
     if is_ca:
         ext_key_usage = [ExtendedKeyUsageOID.CLIENT_AUTH, ExtendedKeyUsageOID.SERVER_AUTH]
     elif cert_type == 'client':
         ext_key_usage = [ExtendedKeyUsageOID.CLIENT_AUTH]
     elif cert_type == 'server':
         ext_key_usage = [ExtendedKeyUsageOID.SERVER_AUTH]
 
     builder = x509.CertificateBuilder() \
         .subject_name(cert_req.subject) \
         .issuer_name(ca_cert.subject) \
         .public_key(cert_req.public_key()) \
         .serial_number(x509.random_serial_number()) \
         .not_valid_before(datetime.datetime.utcnow()) \
         .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=int(valid_days)))
 
     builder = builder.add_extension(x509.BasicConstraints(ca=is_ca, path_length=0 if is_sub_ca else None), critical=True)
     builder = builder.add_extension(x509.KeyUsage(
         digital_signature=True,
         content_commitment=False,
         key_encipherment=False,
         data_encipherment=False,
         key_agreement=False,
         key_cert_sign=is_ca,
         crl_sign=is_ca,
         encipher_only=False,
         decipher_only=False), critical=True)
     builder = builder.add_extension(x509.ExtendedKeyUsage(ext_key_usage), critical=False)
     builder = builder.add_extension(x509.SubjectKeyIdentifier.from_public_key(cert_req.public_key()), critical=False)
 
     if not is_ca or is_sub_ca:
         builder = builder.add_extension(add_key_identifier(ca_cert), critical=False)
 
     for ext in cert_req.extensions:
         builder = builder.add_extension(ext.value, critical=False)
 
     return builder.sign(ca_private_key, hashes.SHA256())
 
 def create_certificate_revocation_list(ca_cert, ca_private_key, serial_numbers=[]):
     if not serial_numbers:
         return False
 
     builder = x509.CertificateRevocationListBuilder() \
         .issuer_name(ca_cert.subject) \
         .last_update(datetime.datetime.today()) \
         .next_update(datetime.datetime.today() + datetime.timedelta(1, 0, 0))
 
     for serial_number in serial_numbers:
         revoked_cert = x509.RevokedCertificateBuilder() \
             .serial_number(serial_number) \
             .revocation_date(datetime.datetime.today()) \
             .build()
         builder = builder.add_revoked_certificate(revoked_cert)
 
     return builder.sign(private_key=ca_private_key, algorithm=hashes.SHA256())
 
 def create_dh_parameters(bits=2048):
     if not bits or bits < 512:
         print("Invalid DH parameter key size")
         return False
 
     return dh.generate_parameters(generator=2, key_size=int(bits))
 
 # Wrap functions
 
 def wrap_public_key(raw_data):
     return KEY_PUB_BEGIN + raw_data + KEY_PUB_END
 
 def wrap_private_key(raw_data, passphrase=None):
     return (KEY_ENC_BEGIN if passphrase else KEY_BEGIN) + raw_data + (KEY_ENC_END if passphrase else KEY_END)
 
+def wrap_openssh_public_key(raw_data, type):
+    return f'{type} {raw_data}'
+
+def wrap_openssh_private_key(raw_data):
+    return OPENSSH_KEY_BEGIN + raw_data +  OPENSSH_KEY_END
+
 def wrap_certificate_request(raw_data):
     return CSR_BEGIN + raw_data + CSR_END
 
 def wrap_certificate(raw_data):
     return CERT_BEGIN + raw_data + CERT_END
 
 def wrap_crl(raw_data):
     return CRL_BEGIN + raw_data + CRL_END
 
 def wrap_dh_parameters(raw_data):
     return DH_BEGIN + raw_data + DH_END
 
 def wrap_openvpn_key(raw_data, version='1'):
     return OVPN_BEGIN.format(version) + raw_data + OVPN_END.format(version)
 
 # Load functions
-
 def load_public_key(raw_data, wrap_tags=True):
     if wrap_tags:
         raw_data = wrap_public_key(raw_data)
 
     try:
         return serialization.load_pem_public_key(bytes(raw_data, 'utf-8'))
     except ValueError:
         return False
 
 def load_private_key(raw_data, passphrase=None, wrap_tags=True):
     if wrap_tags:
         raw_data = wrap_private_key(raw_data, passphrase)
 
     if passphrase is not None:
         passphrase = bytes(passphrase, 'utf-8')
 
     try:
         return serialization.load_pem_private_key(bytes(raw_data, 'utf-8'), password=passphrase)
     except ValueError:
         return False
 
+def load_openssh_public_key(raw_data, type):
+    try:
+        return serialization.load_ssh_public_key(bytes(f'{type} {raw_data}', 'utf-8'))
+    except ValueError:
+        return False
+
+def load_openssh_private_key(raw_data, passphrase=None, wrap_tags=True):
+    if wrap_tags:
+        raw_data = wrap_openssh_private_key(raw_data)
+
+    try:
+        return serialization.load_ssh_private_key(bytes(raw_data, 'utf-8'), password=passphrase)
+    except ValueError:
+        return False
+
 def load_certificate_request(raw_data, wrap_tags=True):
     if wrap_tags:
         raw_data = wrap_certificate_request(raw_data)
 
     try:
         return x509.load_pem_x509_csr(bytes(raw_data, 'utf-8'))
     except ValueError:
         return False
 
 def load_certificate(raw_data, wrap_tags=True):
     if wrap_tags:
         raw_data = wrap_certificate(raw_data)
 
     try:
         return x509.load_pem_x509_certificate(bytes(raw_data, 'utf-8'))
     except ValueError:
         return False
 
 def load_crl(raw_data, wrap_tags=True):
     if wrap_tags:
         raw_data = wrap_crl(raw_data)
 
     try:
         return x509.load_pem_x509_crl(bytes(raw_data, 'utf-8'))
     except ValueError:
         return False
 
 def load_dh_parameters(raw_data, wrap_tags=True):
     if wrap_tags:
         raw_data = wrap_dh_parameters(raw_data)
 
     try:
         return serialization.load_pem_parameters(bytes(raw_data, 'utf-8'))
     except ValueError:
         return False
 
 # Verify
 
 def is_ca_certificate(cert):
     if not cert:
         return False
 
     try:
         ext = cert.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS)
         return ext.value.ca
     except ExtensionNotFound:
         return False
 
 def verify_certificate(cert, ca_cert):
     # Verify certificate was signed by specified CA
     if ca_cert.subject != cert.issuer:
         return False
 
     ca_public_key = ca_cert.public_key()
     try:
         if isinstance(ca_public_key, rsa.RSAPublicKeyWithSerialization):
             ca_public_key.verify(
                 cert.signature,
                 cert.tbs_certificate_bytes,
                 padding=padding.PKCS1v15(),
                 algorithm=cert.signature_hash_algorithm)
         elif isinstance(ca_public_key, dsa.DSAPublicKeyWithSerialization):
             ca_public_key.verify(
                 cert.signature,
                 cert.tbs_certificate_bytes,
                 algorithm=cert.signature_hash_algorithm)
         elif isinstance(ca_public_key, ec.EllipticCurvePublicKeyWithSerialization):
             ca_public_key.verify(
                 cert.signature,
                 cert.tbs_certificate_bytes,
                 signature_algorithm=ec.ECDSA(cert.signature_hash_algorithm))
         else:
             return False # We cannot verify it
         return True
     except InvalidSignature:
         return False
 
 def verify_crl(crl, ca_cert):
     # Verify CRL was signed by specified CA
     if ca_cert.subject != crl.issuer:
         return False
 
     ca_public_key = ca_cert.public_key()
     try:
         if isinstance(ca_public_key, rsa.RSAPublicKeyWithSerialization):
             ca_public_key.verify(
                 crl.signature,
                 crl.tbs_certlist_bytes,
                 padding=padding.PKCS1v15(),
                 algorithm=crl.signature_hash_algorithm)
         elif isinstance(ca_public_key, dsa.DSAPublicKeyWithSerialization):
             ca_public_key.verify(
                 crl.signature,
                 crl.tbs_certlist_bytes,
                 algorithm=crl.signature_hash_algorithm)
         elif isinstance(ca_public_key, ec.EllipticCurvePublicKeyWithSerialization):
             ca_public_key.verify(
                 crl.signature,
                 crl.tbs_certlist_bytes,
                 signature_algorithm=ec.ECDSA(crl.signature_hash_algorithm))
         else:
             return False # We cannot verify it
         return True
     except InvalidSignature:
         return False
 
 def verify_ca_chain(sorted_names, pki_node):
     if len(sorted_names) == 1: # Single cert, no chain
         return True
 
     for name in sorted_names:
         cert = load_certificate(pki_node[name]['certificate'])
         verified = False
         for ca_name in sorted_names:
             if name == ca_name:
                 continue
             ca_cert = load_certificate(pki_node[ca_name]['certificate'])
             if verify_certificate(cert, ca_cert):
                 verified = True
                 break
         if not verified and name != sorted_names[-1]:
             # Only permit top-most certificate to fail verify (e.g. signed by public CA not explicitly in chain)
             return False
     return True
 
 # Certificate chain
 
 def find_parent(cert, ca_certs):
     for ca_cert in ca_certs:
         if verify_certificate(cert, ca_cert):
             return ca_cert
     return None
 
 def find_chain(cert, ca_certs):
     remaining = ca_certs.copy()
     chain = [cert]
 
     while remaining:
         parent = find_parent(chain[-1], remaining)
         if parent is None:
             # No parent in the list of remaining certificates or there's a circular dependency
             break
         elif parent == chain[-1]:
             # Self-signed: must be root CA (end of chain)
             break
         else:
             remaining.remove(parent)
             chain.append(parent)
 
     return chain
 
 def sort_ca_chain(ca_names, pki_node):
     def ca_cmp(ca_name1, ca_name2, pki_node):
         cert1 = load_certificate(pki_node[ca_name1]['certificate'])
         cert2 = load_certificate(pki_node[ca_name2]['certificate'])
 
         if verify_certificate(cert1, cert2): # cert1 is child of cert2
             return -1
         return 1
 
     from functools import cmp_to_key
     return sorted(ca_names, key=cmp_to_key(lambda cert1, cert2: ca_cmp(cert1, cert2, pki_node)))
-
diff --git a/src/conf_mode/pki.py b/src/conf_mode/pki.py
index 4be40e99e..2d076e42d 100755
--- a/src/conf_mode/pki.py
+++ b/src/conf_mode/pki.py
@@ -1,432 +1,464 @@
 #!/usr/bin/env python3
 #
 # Copyright (C) 2021-2024 VyOS maintainers and contributors
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License version 2 or later as
 # published by the Free Software Foundation.
 #
 # This program is distributed in the hope that it will be useful,
 # but WITHOUT ANY WARRANTY; without even the implied warranty of
 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 # GNU General Public License for more details.
 #
 # You should have received a copy of the GNU General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 import os
 
 from sys import argv
 from sys import exit
 
 from vyos.config import Config
 from vyos.config import config_dict_merge
 from vyos.configdep import set_dependents
 from vyos.configdep import call_dependents
 from vyos.configdict import node_changed
 from vyos.configdiff import Diff
 from vyos.defaults import directories
 from vyos.pki import is_ca_certificate
 from vyos.pki import load_certificate
 from vyos.pki import load_public_key
+from vyos.pki import load_openssh_public_key
+from vyos.pki import load_openssh_private_key
 from vyos.pki import load_private_key
 from vyos.pki import load_crl
 from vyos.pki import load_dh_parameters
 from vyos.utils.boot import boot_configuration_complete
 from vyos.utils.dict import dict_search
 from vyos.utils.dict import dict_search_args
 from vyos.utils.dict import dict_search_recursive
 from vyos.utils.process import call
 from vyos.utils.process import cmd
 from vyos.utils.process import is_systemd_service_active
 from vyos import ConfigError
 from vyos import airbag
 airbag.enable()
 
 vyos_certbot_dir = directories['certbot']
 
 # keys to recursively search for under specified path
 sync_search = [
     {
         'keys': ['certificate'],
         'path': ['service', 'https'],
     },
     {
         'keys': ['certificate', 'ca_certificate'],
         'path': ['interfaces', 'ethernet'],
     },
     {
         'keys': ['certificate', 'ca_certificate', 'dh_params', 'shared_secret_key', 'auth_key', 'crypt_key'],
         'path': ['interfaces', 'openvpn'],
     },
     {
         'keys': ['ca_certificate'],
         'path': ['interfaces', 'sstpc'],
     },
     {
         'keys': ['certificate', 'ca_certificate', 'local_key', 'remote_key'],
         'path': ['vpn', 'ipsec'],
     },
     {
         'keys': ['certificate', 'ca_certificate'],
         'path': ['vpn', 'openconnect'],
     },
     {
         'keys': ['certificate', 'ca_certificate'],
         'path': ['vpn', 'sstp'],
     }
 ]
 
 # key from other config nodes -> key in pki['changed'] and pki
 sync_translate = {
     'certificate': 'certificate',
     'ca_certificate': 'ca',
     'dh_params': 'dh',
     'local_key': 'key_pair',
     'remote_key': 'key_pair',
     'shared_secret_key': 'openvpn',
     'auth_key': 'openvpn',
     'crypt_key': 'openvpn'
 }
 
 def certbot_delete(certificate):
     if not boot_configuration_complete():
         return
     if os.path.exists(f'{vyos_certbot_dir}/renewal/{certificate}.conf'):
         cmd(f'certbot delete --non-interactive --config-dir {vyos_certbot_dir} --cert-name {certificate}')
 
 def certbot_request(name: str, config: dict, dry_run: bool=True):
     # We do not call certbot when booting the system - there is no need to do so and
     # request new certificates during boot/image upgrade as the certbot configuration
     # is stored persistent under /config - thus we do not open the door to transient
     # errors
     if not boot_configuration_complete():
         return
 
     domains = '--domains ' + ' --domains '.join(config['domain_name'])
     tmp = f'certbot certonly --non-interactive --config-dir {vyos_certbot_dir} --cert-name {name} '\
           f'--standalone --agree-tos --no-eff-email --expand --server {config["url"]} '\
           f'--email {config["email"]} --key-type rsa --rsa-key-size {config["rsa_key_size"]} '\
           f'{domains}'
     if 'listen_address' in config:
         tmp += f' --http-01-address {config["listen_address"]}'
     # verify() does not need to actually request a cert but only test for plausability
     if dry_run:
         tmp += ' --dry-run'
 
     cmd(tmp, raising=ConfigError, message=f'ACME certbot request failed for "{name}"!')
 
 def get_config(config=None):
     if config:
         conf = config
     else:
         conf = Config()
     base = ['pki']
 
     pki = conf.get_config_dict(base, key_mangling=('-', '_'),
                                      get_first_key=True,
                                      no_tag_node_value_mangle=True)
 
     if len(argv) > 1 and argv[1] == 'certbot_renew':
         pki['certbot_renew'] = {}
 
     tmp = node_changed(conf, base + ['ca'], recursive=True)
     if tmp:
         if 'changed' not in pki: pki.update({'changed':{}})
         pki['changed'].update({'ca' : tmp})
 
     tmp = node_changed(conf, base + ['certificate'], recursive=True)
     if tmp:
         if 'changed' not in pki: pki.update({'changed':{}})
         pki['changed'].update({'certificate' : tmp})
 
     tmp = node_changed(conf, base + ['dh'], recursive=True)
     if tmp:
         if 'changed' not in pki: pki.update({'changed':{}})
         pki['changed'].update({'dh' : tmp})
 
     tmp = node_changed(conf, base + ['key-pair'], recursive=True)
     if tmp:
         if 'changed' not in pki: pki.update({'changed':{}})
         pki['changed'].update({'key_pair' : tmp})
 
+    tmp = node_changed(conf, base + ['openssh'], recursive=True)
+    if tmp:
+        if 'changed' not in pki: pki.update({'changed':{}})
+        pki['changed'].update({'openssh' : tmp})
+
     tmp = node_changed(conf, base + ['openvpn', 'shared-secret'], recursive=True)
     if tmp:
         if 'changed' not in pki: pki.update({'changed':{}})
         pki['changed'].update({'openvpn' : tmp})
 
     # We only merge on the defaults of there is a configuration at all
     if conf.exists(base):
         # We have gathered the dict representation of the CLI, but there are default
         # options which we need to update into the dictionary retrived.
         default_values = conf.get_config_defaults(**pki.kwargs, recursive=True)
         # remove ACME default configuration if unused by CLI
         if 'certificate' in pki:
             for name, cert_config in pki['certificate'].items():
                 if 'acme' not in cert_config:
                     # Remove ACME default values
                     del default_values['certificate'][name]['acme']
 
         # merge CLI and default dictionary
         pki = config_dict_merge(default_values, pki)
 
     # Certbot triggered an external renew of the certificates.
     # Mark all ACME based certificates as "changed" to trigger
     # update of dependent services
     if 'certificate' in pki and 'certbot_renew' in pki:
         renew = []
         for name, cert_config in pki['certificate'].items():
             if 'acme' in cert_config:
                 renew.append(name)
         # If triggered externally by certbot, certificate key is not present in changed
         if 'changed' not in pki: pki.update({'changed':{}})
         pki['changed'].update({'certificate' : renew})
 
     # We need to get the entire system configuration to verify that we are not
     # deleting a certificate that is still referenced somewhere!
     pki['system'] = conf.get_config_dict([], key_mangling=('-', '_'),
                                          get_first_key=True,
                                          no_tag_node_value_mangle=True)
 
     for search in sync_search:
         for key in search['keys']:
             changed_key = sync_translate[key]
             if 'changed' not in pki or changed_key not in pki['changed']:
                 continue
 
             for item_name in pki['changed'][changed_key]:
                 node_present = False
                 if changed_key == 'openvpn':
                     node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name)
                 else:
                     node_present = dict_search_args(pki, changed_key, item_name)
 
                 if node_present:
                     search_dict = dict_search_args(pki['system'], *search['path'])
                     if not search_dict:
                         continue
                     for found_name, found_path in dict_search_recursive(search_dict, key):
                         if found_name == item_name:
                             path = search['path']
                             path_str = ' '.join(path + found_path)
                             print(f'PKI: Updating config: {path_str} {found_name}')
 
                             if path[0] == 'interfaces':
                                 ifname = found_path[0]
                                 set_dependents(path[1], conf, ifname)
                             else:
                                 set_dependents(path[1], conf)
 
     return pki
 
 def is_valid_certificate(raw_data):
     # If it loads correctly we're good, or return False
     return load_certificate(raw_data, wrap_tags=True)
 
 def is_valid_ca_certificate(raw_data):
     # Check if this is a valid certificate with CA attributes
     cert = load_certificate(raw_data, wrap_tags=True)
     if not cert:
         return False
     return is_ca_certificate(cert)
 
 def is_valid_public_key(raw_data):
     # If it loads correctly we're good, or return False
     return load_public_key(raw_data, wrap_tags=True)
 
 def is_valid_private_key(raw_data, protected=False):
     # If it loads correctly we're good, or return False
     # With encrypted private keys, we always return true as we cannot ask for password to verify
     if protected:
         return True
     return load_private_key(raw_data, passphrase=None, wrap_tags=True)
 
+def is_valid_openssh_public_key(raw_data, type):
+    # If it loads correctly we're good, or return False
+    return load_openssh_public_key(raw_data, type)
+
+def is_valid_openssh_private_key(raw_data, protected=False):
+    # If it loads correctly we're good, or return False
+    # With encrypted private keys, we always return true as we cannot ask for password to verify
+    if protected:
+        return True
+    return load_openssh_private_key(raw_data, passphrase=None, wrap_tags=True)
+
 def is_valid_crl(raw_data):
     # If it loads correctly we're good, or return False
     return load_crl(raw_data, wrap_tags=True)
 
 def is_valid_dh_parameters(raw_data):
     # If it loads correctly we're good, or return False
     return load_dh_parameters(raw_data, wrap_tags=True)
 
 def verify(pki):
     if not pki:
         return None
 
     if 'ca' in pki:
         for name, ca_conf in pki['ca'].items():
             if 'certificate' in ca_conf:
                 if not is_valid_ca_certificate(ca_conf['certificate']):
                     raise ConfigError(f'Invalid certificate on CA certificate "{name}"')
 
             if 'private' in ca_conf and 'key' in ca_conf['private']:
                 private = ca_conf['private']
                 protected = 'password_protected' in private
 
                 if not is_valid_private_key(private['key'], protected):
                     raise ConfigError(f'Invalid private key on CA certificate "{name}"')
 
             if 'crl' in ca_conf:
                 ca_crls = ca_conf['crl']
                 if isinstance(ca_crls, str):
                     ca_crls = [ca_crls]
 
                 for crl in ca_crls:
                     if not is_valid_crl(crl):
                         raise ConfigError(f'Invalid CRL on CA certificate "{name}"')
 
     if 'certificate' in pki:
         for name, cert_conf in pki['certificate'].items():
             if 'certificate' in cert_conf:
                 if not is_valid_certificate(cert_conf['certificate']):
                     raise ConfigError(f'Invalid certificate on certificate "{name}"')
 
             if 'private' in cert_conf and 'key' in cert_conf['private']:
                 private = cert_conf['private']
                 protected = 'password_protected' in private
 
                 if not is_valid_private_key(private['key'], protected):
                     raise ConfigError(f'Invalid private key on certificate "{name}"')
 
             if 'acme' in cert_conf:
                 if 'domain_name' not in cert_conf['acme']:
                     raise ConfigError(f'At least one domain-name is required to request '\
                                     f'certificate for "{name}" via ACME!')
 
                 if 'email' not in cert_conf['acme']:
                     raise ConfigError(f'An email address is required to request '\
                                     f'certificate for "{name}" via ACME!')
 
                 if 'certbot_renew' not in pki:
                     # Only run the ACME command if something on this entity changed,
                     # as this is time intensive
                     tmp = dict_search('changed.certificate', pki)
                     if tmp != None and name in tmp:
                         certbot_request(name, cert_conf['acme'])
 
     if 'dh' in pki:
         for name, dh_conf in pki['dh'].items():
             if 'parameters' in dh_conf:
                 if not is_valid_dh_parameters(dh_conf['parameters']):
                     raise ConfigError(f'Invalid DH parameters on "{name}"')
 
     if 'key_pair' in pki:
         for name, key_conf in pki['key_pair'].items():
             if 'public' in key_conf and 'key' in key_conf['public']:
                 if not is_valid_public_key(key_conf['public']['key']):
                     raise ConfigError(f'Invalid public key on key-pair "{name}"')
 
             if 'private' in key_conf and 'key' in key_conf['private']:
                 private = key_conf['private']
                 protected = 'password_protected' in private
                 if not is_valid_private_key(private['key'], protected):
                     raise ConfigError(f'Invalid private key on key-pair "{name}"')
 
+    if 'openssh' in pki:
+        for name, key_conf in pki['openssh'].items():
+            if 'public' in key_conf and 'key' in key_conf['public']:
+                if 'type' not in key_conf['public']:
+                    raise ConfigError(f'Must define OpenSSH public key type for "{name}"')
+                if not is_valid_openssh_public_key(key_conf['public']['key'], key_conf['public']['type']):
+                    raise ConfigError(f'Invalid OpenSSH public key "{name}"')
+
+            if 'private' in key_conf and 'key' in key_conf['private']:
+                private = key_conf['private']
+                protected = 'password_protected' in private
+                if not is_valid_openssh_private_key(private['key'], protected):
+                    raise ConfigError(f'Invalid OpenSSH private key "{name}"')
+
     if 'x509' in pki:
         if 'default' in pki['x509']:
             default_values = pki['x509']['default']
             if 'country' in default_values:
                 country = default_values['country']
                 if len(country) != 2 or not country.isalpha():
                     raise ConfigError(f'Invalid default country value. Value must be 2 alpha characters.')
 
     if 'changed' in pki:
         # if the list is getting longer, we can move to a dict() and also embed the
         # search key as value from line 173 or 176
         for search in sync_search:
             for key in search['keys']:
                 changed_key = sync_translate[key]
 
                 if changed_key not in pki['changed']:
                     continue
 
                 for item_name in pki['changed'][changed_key]:
                     node_present = False
                     if changed_key == 'openvpn':
                         node_present = dict_search_args(pki, 'openvpn', 'shared_secret', item_name)
                     else:
                         node_present = dict_search_args(pki, changed_key, item_name)
 
                     if not node_present:
                         search_dict = dict_search_args(pki['system'], *search['path'])
 
                         if not search_dict:
                             continue
 
                         for found_name, found_path in dict_search_recursive(search_dict, key):
                             if found_name == item_name:
                                 path_str = " ".join(search['path'] + found_path)
                                 raise ConfigError(f'PKI object "{item_name}" still in use by "{path_str}"')
 
     return None
 
 def generate(pki):
     if not pki:
         return None
 
     # Certbot renewal only needs to re-trigger the services to load up the
     # new PEM file
     if 'certbot_renew' in pki:
         return None
 
     certbot_list = []
     certbot_list_on_disk = []
     if os.path.exists(f'{vyos_certbot_dir}/live'):
         certbot_list_on_disk = [f.path.split('/')[-1] for f in os.scandir(f'{vyos_certbot_dir}/live') if f.is_dir()]
 
     if 'certificate' in pki:
         changed_certificates = dict_search('changed.certificate', pki)
         for name, cert_conf in pki['certificate'].items():
             if 'acme' in cert_conf:
                 certbot_list.append(name)
                 # generate certificate if not found on disk
                 if name not in certbot_list_on_disk:
                     certbot_request(name, cert_conf['acme'], dry_run=False)
                 elif changed_certificates != None and name in changed_certificates:
                     # when something for the certificate changed, we should delete it
                     if name in certbot_list_on_disk:
                         certbot_delete(name)
                     certbot_request(name, cert_conf['acme'], dry_run=False)
 
     # Cleanup certbot configuration and certificates if no longer in use by CLI
     # Get foldernames under vyos_certbot_dir which each represent a certbot cert
     if os.path.exists(f'{vyos_certbot_dir}/live'):
         for cert in certbot_list_on_disk:
             if cert not in certbot_list:
                 # certificate is no longer active on the CLI - remove it
                 certbot_delete(cert)
 
     return None
 
 def apply(pki):
     systemd_certbot_name = 'certbot.timer'
     if not pki:
         call(f'systemctl stop {systemd_certbot_name}')
         return None
 
     has_certbot = False
     if 'certificate' in pki:
         for name, cert_conf in pki['certificate'].items():
             if 'acme' in cert_conf:
                 has_certbot = True
                 break
 
     if not has_certbot:
         call(f'systemctl stop {systemd_certbot_name}')
     elif has_certbot and not is_systemd_service_active(systemd_certbot_name):
         call(f'systemctl restart {systemd_certbot_name}')
 
     if 'changed' in pki:
         call_dependents()
 
     return None
 
 if __name__ == '__main__':
     try:
         c = get_config()
         verify(c)
         generate(c)
         apply(c)
     except ConfigError as e:
         print(e)
         exit(1)