diff --git a/debian/vyos-1x.install b/debian/vyos-1x.install index fff6ebeab..502fc7aaa 100644 --- a/debian/vyos-1x.install +++ b/debian/vyos-1x.install @@ -1,44 +1,45 @@ etc/bash_completion.d etc/commit etc/default etc/dhcp etc/ipsec.d etc/logrotate.d etc/netplug etc/opennhrp etc/modprobe.d etc/ppp etc/rsyslog.conf etc/securetty etc/security etc/skel etc/sudoers.d etc/systemd etc/sysctl.d etc/telegraf etc/udev etc/update-motd.d etc/vyos lib/ opt/ srv/localui usr/sbin usr/bin/config-mgmt usr/bin/initial-setup usr/bin/vyos-show-config usr/bin/vyos-config-file-query usr/bin/vyos-config-to-commands usr/bin/vyos-config-to-json +usr/bin/vyos-commands-to-config usr/bin/vyos-hostsd-client usr/lib usr/libexec/vyos/activate usr/libexec/vyos/completion usr/libexec/vyos/conf_mode usr/libexec/vyos/init usr/libexec/vyos/op_mode usr/libexec/vyos/services usr/libexec/vyos/system usr/libexec/vyos/validators usr/libexec/vyos/*.py usr/libexec/vyos/*.sh usr/share diff --git a/python/vyos/utils/config.py b/python/vyos/utils/config.py index 33047010b..deda13c13 100644 --- a/python/vyos/utils/config.py +++ b/python/vyos/utils/config.py @@ -1,39 +1,105 @@ # Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. import os +from typing import TYPE_CHECKING + from vyos.defaults import directories +# https://peps.python.org/pep-0484/#forward-references +if TYPE_CHECKING: + from vyos.configtree import ConfigTree + config_file = os.path.join(directories['config'], 'config.boot') def read_saved_value(path: list): if not isinstance(path, list) or not path: return '' from vyos.configtree import ConfigTree try: with open(config_file) as f: config_string = f.read() ct = ConfigTree(config_string) except Exception: return '' if not ct.exists(path): return '' res = ct.return_values(path) if len(res) == 1: return res[0] res = ct.list_nodes(path) if len(res) == 1: return ' '.join(res) return res + +def flag(l: list) -> list: + res = [l[0:i] for i,_ in enumerate(l, start=1)] + return res + +def tag_node_of_path(p: list) -> list: + from vyos.xml_ref import is_tag + + fl = flag(p) + res = list(map(is_tag, fl)) + + return res + +def set_tags(ct: 'ConfigTree', path: list) -> None: + fl = flag(path) + if_tag = tag_node_of_path(path) + for condition, target in zip(if_tag, fl): + if condition: + ct.set_tag(target) + +def parse_commands(cmds: str) -> dict: + from re import split as re_split + from shlex import split as shlex_split + + from vyos.xml_ref import definition + from vyos.xml_ref.pkg_cache.vyos_1x_cache import reference + + ref_tree = definition.Xml() + ref_tree.define(reference) + + res = [] + + cmds = re_split(r'\n+', cmds) + for c in cmds: + cmd_parts = shlex_split(c) + + if not cmd_parts: + # Ignore empty lines + continue + + path = cmd_parts[1:] + op = cmd_parts[0] + + try: + path, value = ref_tree.split_path(path) + except ValueError as e: + raise ValueError(f'Incorrect command: {e}') + + entry = {} + entry["op"] = op + entry["path"] = path + entry["value"] = value + + entry["is_multi"] = ref_tree.is_multi(path) + entry["is_leaf"] = ref_tree.is_leaf(path) + entry["is_tag"] = ref_tree.is_tag(path) + + res.append(entry) + + return res diff --git a/python/vyos/xml_ref/definition.py b/python/vyos/xml_ref/definition.py index 5ff28daed..4e755ab72 100644 --- a/python/vyos/xml_ref/definition.py +++ b/python/vyos/xml_ref/definition.py @@ -1,339 +1,365 @@ # Copyright 2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this library. If not, see <http://www.gnu.org/licenses/>. -from typing import Optional, Union, Any, TYPE_CHECKING +from typing import Tuple, Optional, Union, Any, TYPE_CHECKING # https://peps.python.org/pep-0484/#forward-references # for type 'ConfigDict' if TYPE_CHECKING: from vyos.config import ConfigDict def set_source_recursive(o: Union[dict, str, list], b: bool): d = {} if not isinstance(o, dict): d = {'_source': b} else: for k, v in o.items(): d[k] = set_source_recursive(v, b) d |= {'_source': b} return d def source_dict_merge(src: dict, dest: dict): from copy import deepcopy dst = deepcopy(dest) from_src = {} for key, value in src.items(): if key not in dst: dst[key] = value from_src[key] = set_source_recursive(value, True) elif isinstance(src[key], dict): dst[key], f = source_dict_merge(src[key], dst[key]) f |= {'_source': False} from_src[key] = f return dst, from_src def ext_dict_merge(src: dict, dest: Union[dict, 'ConfigDict']): d, f = source_dict_merge(src, dest) if hasattr(d, '_from_defaults'): setattr(d, '_from_defaults', f) return d def from_source(d: dict, path: list) -> bool: for key in path: d = d[key] if key in d else {} if not d or not isinstance(d, dict): return False return d.get('_source', False) class Xml: def __init__(self): self.ref = {} def define(self, ref: dict): self.ref = ref def _get_ref_node_data(self, node: dict, data: str) -> Union[bool, str]: res = node.get('node_data', {}) if not res: raise ValueError("non-existent node data") if data not in res: raise ValueError("non-existent data field") return res.get(data) def _get_ref_path(self, path: list) -> dict: ref_path = path.copy() d = self.ref while ref_path and d: d = d.get(ref_path[0], {}) ref_path.pop(0) if self._is_tag_node(d) and ref_path: ref_path.pop(0) return d def _is_tag_node(self, node: dict) -> bool: res = self._get_ref_node_data(node, 'node_type') return res == 'tag' + def exists(self, path: list) -> bool: + try: + _ = self._get_ref_path(path) + return True + except ValueError: + return False + + def split_path(self, path: list) -> Tuple[list, Optional[str]]: + """ Splits a list into config path and value components """ + + # First, check if the complete path is valid by itself + if self.exists(path): + if self.is_valueless(path) or not self.is_leaf(path): + # It's a complete path for a valueless node + # or a path to an empy non-leaf node + return (path, None) + else: + raise ValueError(f'Path "{path}" needs a value or children') + else: + # If the complete path doesn't exist, it's probably a path with a value + if self.exists(path[0:-1]): + return (path[0:-1], path[-1]) + else: + # Or not a valid path at all + raise ValueError(f'Path "{path}" is incorrect') + def is_tag(self, path: list) -> bool: ref_path = path.copy() d = self.ref while ref_path and d: d = d.get(ref_path[0], {}) ref_path.pop(0) if self._is_tag_node(d) and ref_path: if len(ref_path) == 1: return False ref_path.pop(0) return self._is_tag_node(d) def is_tag_value(self, path: list) -> bool: if len(path) < 2: return False return self.is_tag(path[:-1]) def _is_multi_node(self, node: dict) -> bool: b = self._get_ref_node_data(node, 'multi') assert isinstance(b, bool) return b def is_multi(self, path: list) -> bool: d = self._get_ref_path(path) return self._is_multi_node(d) def _is_valueless_node(self, node: dict) -> bool: b = self._get_ref_node_data(node, 'valueless') assert isinstance(b, bool) return b def is_valueless(self, path: list) -> bool: d = self._get_ref_path(path) return self._is_valueless_node(d) def _is_leaf_node(self, node: dict) -> bool: res = self._get_ref_node_data(node, 'node_type') return res == 'leaf' def is_leaf(self, path: list) -> bool: d = self._get_ref_path(path) return self._is_leaf_node(d) def _least_upper_data(self, path: list, name: str) -> str: ref_path = path.copy() d = self.ref data = '' tag = '' while ref_path and d: tag_val = '' d = d.get(ref_path[0], {}) ref_path.pop(0) if self._is_tag_node(d) and ref_path: tag_val = ref_path[0] ref_path.pop(0) if self._is_leaf_node(d) and ref_path: ref_path.pop(0) res = self._get_ref_node_data(d, name) if res is not None: data = res tag = tag_val return data, tag def owner(self, path: list, with_tag=False) -> str: from pathlib import Path data, tag = self._least_upper_data(path, 'owner') tag_ext = f'_{tag}' if tag else '' if data: if with_tag: data = Path(data.split()[0]).stem data = f'{data}{tag_ext}' else: data = Path(data.split()[0]).name return data def priority(self, path: list) -> str: data, _ = self._least_upper_data(path, 'priority') return data @staticmethod def _dict_get(d: dict, path: list) -> dict: for i in path: d = d.get(i, {}) if not isinstance(d, dict): return {} if not d: break return d def _dict_find(self, d: dict, key: str, non_local=False) -> bool: for k in list(d): if k in ('node_data', 'component_version'): continue if k == key: return True if non_local and isinstance(d[k], dict): if self._dict_find(d[k], key): return True return False def cli_defined(self, path: list, node: str, non_local=False) -> bool: d = self._dict_get(self.ref, path) return self._dict_find(d, node, non_local=non_local) def component_version(self) -> dict: d = {} for k, v in self.ref['component_version'].items(): d[k] = int(v) return d def multi_to_list(self, rpath: list, conf: dict) -> dict: res: Any = {} for k in list(conf): d = self._get_ref_path(rpath + [k]) if self._is_leaf_node(d): if self._is_multi_node(d) and not isinstance(conf[k], list): res[k] = [conf[k]] else: res[k] = conf[k] else: res[k] = self.multi_to_list(rpath + [k], conf[k]) return res def _get_default_value(self, node: dict) -> Optional[str]: return self._get_ref_node_data(node, "default_value") def _get_default(self, node: dict) -> Optional[Union[str, list]]: default = self._get_default_value(node) if default is None: return None if self._is_multi_node(node): return default.split() return default def default_value(self, path: list) -> Optional[Union[str, list]]: d = self._get_ref_path(path) default = self._get_default_value(d) if default is None: return None if self._is_multi_node(d) or self._is_tag_node(d): return default.split() return default def get_defaults(self, path: list, get_first_key=False, recursive=False) -> dict: """Return dict containing default values below path Note that descent below path will not proceed beyond an encountered tag node, as no tag node value is known. For a default dict relative to an existing config dict containing tag node values, see function: 'relative_defaults' """ res: dict = {} if self.is_tag(path): return res d = self._get_ref_path(path) if self._is_leaf_node(d): default_value = self._get_default(d) if default_value is not None: return {path[-1]: default_value} if path else {} for k in list(d): if k in ('node_data', 'component_version') : continue if self._is_leaf_node(d[k]): default_value = self._get_default(d[k]) if default_value is not None: res |= {k: default_value} elif self.is_tag(path + [k]): # tag node defaults are used as suggestion, not default value; # should this change, append to path and continue if recursive pass else: if recursive: pos = self.get_defaults(path + [k], recursive=True) res |= pos if res: if get_first_key or not path: return res return {path[-1]: res} return {} def _well_defined(self, path: list, conf: dict) -> bool: # test disjoint path + conf for sensible config paths def step(c): return [next(iter(c.keys()))] if c else [] try: tmp = step(conf) if tmp and self.is_tag_value(path + tmp): c = conf[tmp[0]] if not isinstance(c, dict): raise ValueError tmp = tmp + step(c) self._get_ref_path(path + tmp) else: self._get_ref_path(path + tmp) except ValueError: return False return True def _relative_defaults(self, rpath: list, conf: dict, recursive=False) -> dict: res: dict = {} res = self.get_defaults(rpath, recursive=recursive, get_first_key=True) for k in list(conf): if isinstance(conf[k], dict): step = self._relative_defaults(rpath + [k], conf=conf[k], recursive=recursive) res |= step if res: return {rpath[-1]: res} if rpath else res return {} def relative_defaults(self, path: list, conf: dict, get_first_key=False, recursive=False) -> dict: """Return dict containing defaults along paths of a config dict """ if not conf: return self.get_defaults(path, get_first_key=get_first_key, recursive=recursive) if not self._well_defined(path, conf): # adjust for possible overlap: if path and path[-1] in list(conf): conf = conf[path[-1]] conf = {} if not isinstance(conf, dict) else conf if not self._well_defined(path, conf): print('path to config dict does not define full config paths') return {} res = self._relative_defaults(path, conf, recursive=recursive) if get_first_key and path: if res.values(): res = next(iter(res.values())) else: res = {} return res diff --git a/src/utils/vyos-commands-to-config b/src/utils/vyos-commands-to-config new file mode 100755 index 000000000..927d9bd70 --- /dev/null +++ b/src/utils/vyos-commands-to-config @@ -0,0 +1,53 @@ +#! /usr/bin/python3 +# +# Copyright (C) 2024 VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import sys +import json + +from vyos.configtree import ConfigTree +from vyos.utils.config import parse_commands +from vyos.utils.config import set_tags + +def commands_to_config(cmds): + ct = ConfigTree('') + cmds = parse_commands(cmds) + + for c in cmds: + if c["op"] == "set": + if c["is_leaf"]: + replace = False if c["is_multi"] else True + ct.set(c["path"], value=c["value"], replace=replace) + set_tags(ct, c["path"]) + else: + ct.create_node(c["path"]) + set_tags(ct, c["path"]) + else: + raise ValueError( + f"\"{c['op']}\" is not a supported config operation") + + return ct + + +if __name__ == '__main__': + try: + cmds = sys.stdin.read() + ct = commands_to_config(cmds) + out = ConfigTree(ct.to_string()) + print(str(out)) + except Exception as e: + print(e) + sys.exit(1)