diff --git a/python/vyos/configdep.py b/python/vyos/configdep.py index 64727d355..73bd9ea96 100644 --- a/python/vyos/configdep.py +++ b/python/vyos/configdep.py @@ -1,149 +1,190 @@ -# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# Copyright 2023-2024 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this library. If not, see <http://www.gnu.org/licenses/>. import os import json import typing from inspect import stack from graphlib import TopologicalSorter, CycleError from vyos.utils.system import load_as_module from vyos.configdict import dict_merge from vyos.defaults import directories from vyos.configsource import VyOSError from vyos import ConfigError # https://peps.python.org/pep-0484/#forward-references # for type 'Config' if typing.TYPE_CHECKING: from vyos.config import Config dependency_dir = os.path.join(directories['data'], 'config-mode-dependencies') -dependent_func: dict[str, list[typing.Callable]] = {} +local_dependent_func: dict[str, list[typing.Callable]] = {} + +DEBUG = False +FORCE_LOCAL = False + +def debug_print(s: str): + if DEBUG: + print(s) def canon_name(name: str) -> str: return os.path.splitext(name)[0].replace('-', '_') def canon_name_of_path(path: str) -> str: script = os.path.basename(path) return canon_name(script) def caller_name() -> str: return stack()[2].filename +def name_of(f: typing.Callable) -> str: + return f.__name__ + +def names_of(l: list[typing.Callable]) -> list[str]: + return [name_of(f) for f in l] + +def remove_redundant(l: list[typing.Callable]) -> list[typing.Callable]: + names = set() + for e in reversed(l): + _ = l.remove(e) if name_of(e) in names else names.add(name_of(e)) + +def append_uniq(l: list[typing.Callable], e: typing.Callable): + """Append an element, removing earlier occurrences + + The list of dependencies is generally short and traversing the list on + each append is preferable to the cost of redundant script invocation. + """ + l.append(e) + remove_redundant(l) + def read_dependency_dict(dependency_dir: str = dependency_dir) -> dict: res = {} for dep_file in os.listdir(dependency_dir): if not dep_file.endswith('.json'): continue path = os.path.join(dependency_dir, dep_file) with open(path) as f: d = json.load(f) if dep_file == 'vyos-1x.json': res = dict_merge(res, d) else: res = dict_merge(d, res) return res def get_dependency_dict(config: 'Config') -> dict: if hasattr(config, 'cached_dependency_dict'): d = getattr(config, 'cached_dependency_dict') else: d = read_dependency_dict() setattr(config, 'cached_dependency_dict', d) return d def run_config_mode_script(script: str, config: 'Config'): path = os.path.join(directories['conf_mode'], script) name = canon_name(script) mod = load_as_module(name, path) config.set_level([]) try: c = mod.get_config(config) mod.verify(c) mod.generate(c) mod.apply(c) except (VyOSError, ConfigError) as e: raise ConfigError(repr(e)) def def_closure(target: str, config: 'Config', tagnode: typing.Optional[str] = None) -> typing.Callable: script = target + '.py' def func_impl(): if tagnode: os.environ['VYOS_TAGNODE_VALUE'] = tagnode run_config_mode_script(script, config) return func_impl def set_dependents(case: str, config: 'Config', tagnode: typing.Optional[str] = None): d = get_dependency_dict(config) k = canon_name_of_path(caller_name()) - l = dependent_func.setdefault(k, []) + tag_ext = f'_{tagnode}' if tagnode is not None else '' + if hasattr(config, 'dependent_func') and not FORCE_LOCAL: + dependent_func = getattr(config, 'dependent_func') + l = dependent_func.setdefault('vyos_configd', []) + else: + dependent_func = local_dependent_func + l = dependent_func.setdefault(k, []) for target in d[k][case]: func = def_closure(target, config, tagnode) - l.append(func) + func.__name__ = f'{target}{tag_ext}' + append_uniq(l, func) + debug_print(f'set_dependents: caller {k}, dependents {names_of(l)}') -def call_dependents(): +def call_dependents(dependent_func: dict = None): k = canon_name_of_path(caller_name()) - l = dependent_func.get(k, []) + if dependent_func is None or FORCE_LOCAL: + dependent_func = local_dependent_func + l = dependent_func.get(k, []) + else: + l = dependent_func.get('vyos_configd', []) + debug_print(f'call_dependents: caller {k}, dependents {names_of(l)}') while l: f = l.pop(0) + debug_print(f'calling: {f.__name__}') f() def called_as_dependent() -> bool: st = stack()[1:] for f in st: if f.filename == __file__: return True return False def graph_from_dependency_dict(d: dict) -> dict: g = {} for k in list(d): g[k] = set() # add the dependencies for every sub-case; should there be cases # that are mutally exclusive in the future, the graphs will be # distinguished for el in list(d[k]): g[k] |= set(d[k][el]) return g def is_acyclic(d: dict) -> bool: g = graph_from_dependency_dict(d) ts = TopologicalSorter(g) try: # get node iterator order = ts.static_order() # try iteration _ = [*order] except CycleError: return False return True def check_dependency_graph(dependency_dir: str = dependency_dir, supplement: str = None) -> bool: d = read_dependency_dict(dependency_dir=dependency_dir) if supplement is not None: with open(supplement) as f: d = dict_merge(json.load(f), d) return is_acyclic(d) diff --git a/src/services/vyos-configd b/src/services/vyos-configd index 355182b26..648a017d5 100755 --- a/src/services/vyos-configd +++ b/src/services/vyos-configd @@ -1,289 +1,302 @@ #!/usr/bin/env python3 # -# Copyright (C) 2020-2023 VyOS maintainers and contributors +# Copyright (C) 2020-2024 VyOS maintainers and contributors # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License version 2 or later as # published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import os import sys import grp import re import json +import typing import logging import signal import importlib.util import zmq from contextlib import contextmanager from vyos.defaults import directories from vyos.utils.boot import boot_configuration_complete from vyos.configsource import ConfigSourceString from vyos.configsource import ConfigSourceError +from vyos.configdep import call_dependents from vyos.config import Config from vyos import ConfigError CFG_GROUP = 'vyattacfg' script_stdout_log = '/tmp/vyos-configd-script-stdout' debug = True logger = logging.getLogger(__name__) logs_handler = logging.StreamHandler() logger.addHandler(logs_handler) if debug: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) SOCKET_PATH = "ipc:///run/vyos-configd.sock" # Response error codes R_SUCCESS = 1 R_ERROR_COMMIT = 2 R_ERROR_DAEMON = 4 R_PASS = 8 vyos_conf_scripts_dir = directories['conf_mode'] configd_include_file = os.path.join(directories['data'], 'configd-include.json') configd_env_set_file = os.path.join(directories['data'], 'vyos-configd-env-set') configd_env_unset_file = os.path.join(directories['data'], 'vyos-configd-env-unset') # sourced on entering config session configd_env_file = '/etc/default/vyos-configd-env' session_out = None session_mode = None def key_name_from_file_name(f): return os.path.splitext(f)[0] def module_name_from_key(k): return k.replace('-', '_') def path_from_file_name(f): return os.path.join(vyos_conf_scripts_dir, f) # opt-in to be run by daemon with open(configd_include_file) as f: try: include = json.load(f) except OSError as e: logger.critical(f"configd include file error: {e}") sys.exit(1) except json.JSONDecodeError as e: logger.critical(f"JSON load error: {e}") sys.exit(1) # import conf_mode scripts (_, _, filenames) = next(iter(os.walk(vyos_conf_scripts_dir))) filenames.sort() load_filenames = [f for f in filenames if f in include] imports = [key_name_from_file_name(f) for f in load_filenames] module_names = [module_name_from_key(k) for k in imports] paths = [path_from_file_name(f) for f in load_filenames] to_load = list(zip(module_names, paths)) modules = [] for x in to_load: spec = importlib.util.spec_from_file_location(x[0], x[1]) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) modules.append(module) conf_mode_scripts = dict(zip(imports, modules)) exclude_set = {key_name_from_file_name(f) for f in filenames if f not in include} include_set = {key_name_from_file_name(f) for f in filenames if f in include} @contextmanager def stdout_redirected(filename, mode): saved_stdout_fd = None destination_file = None try: sys.stdout.flush() saved_stdout_fd = os.dup(sys.stdout.fileno()) destination_file = open(filename, mode) os.dup2(destination_file.fileno(), sys.stdout.fileno()) yield finally: if saved_stdout_fd is not None: os.dup2(saved_stdout_fd, sys.stdout.fileno()) os.close(saved_stdout_fd) if destination_file is not None: destination_file.close() def explicit_print(path, mode, msg): try: with open(path, mode) as f: f.write(f"\n{msg}\n\n") except OSError: logger.critical("error explicit_print") def run_script(script, config, args) -> int: script.argv = args config.set_level([]) try: c = script.get_config(config) script.verify(c) script.generate(c) script.apply(c) except ConfigError as e: logger.critical(e) explicit_print(session_out, session_mode, str(e)) return R_ERROR_COMMIT except Exception as e: logger.critical(e) return R_ERROR_DAEMON return R_SUCCESS def initialization(socket): global session_out global session_mode # Reset config strings: active_string = '' session_string = '' # check first for resent init msg, in case of client timeout while True: msg = socket.recv().decode("utf-8", "ignore") try: message = json.loads(msg) if message["type"] == "init": resp = "init" socket.send(resp.encode()) except: break # zmq synchronous for ipc from single client: active_string = msg resp = "active" socket.send(resp.encode()) session_string = socket.recv().decode("utf-8", "ignore") resp = "session" socket.send(resp.encode()) pid_string = socket.recv().decode("utf-8", "ignore") resp = "pid" socket.send(resp.encode()) logger.debug(f"config session pid is {pid_string}") try: session_out = os.readlink(f"/proc/{pid_string}/fd/1") session_mode = 'w' except FileNotFoundError: session_out = None # if not a 'live' session, for example on boot, write to file if not session_out or not boot_configuration_complete(): session_out = script_stdout_log session_mode = 'a' try: configsource = ConfigSourceString(running_config_text=active_string, session_config_text=session_string) except ConfigSourceError as e: logger.debug(e) return None config = Config(config_source=configsource) + dependent_func: dict[str, list[typing.Callable]] = {} + setattr(config, 'dependent_func', dependent_func) return config -def process_node_data(config, data) -> int: +def process_node_data(config, data, last: bool = False) -> int: if not config: logger.critical(f"Empty config") return R_ERROR_DAEMON script_name = None args = [] res = re.match(r'^(VYOS_TAGNODE_VALUE=[^/]+)?.*\/([^/]+).py(.*)', data) if res.group(1): env = res.group(1).split('=') os.environ[env[0]] = env[1] if res.group(2): script_name = res.group(2) if not script_name: logger.critical(f"Missing script_name") return R_ERROR_DAEMON if res.group(3): args = res.group(3).split() args.insert(0, f'{script_name}.py') if script_name not in include_set: + # call dependents now if last element of prio queue is run + # independent of configd + if last: + call_dependents(dependent_func=config.dependent_func) return R_PASS with stdout_redirected(session_out, session_mode): result = run_script(conf_mode_scripts[script_name], config, args) + if last: + call_dependents(dependent_func=config.dependent_func) + return result def remove_if_file(f: str): try: os.remove(f) except FileNotFoundError: pass except OSError: raise def shutdown(): remove_if_file(configd_env_file) os.symlink(configd_env_unset_file, configd_env_file) sys.exit(0) if __name__ == '__main__': context = zmq.Context() socket = context.socket(zmq.REP) # Set the right permissions on the socket, then change it back o_mask = os.umask(0) socket.bind(SOCKET_PATH) os.umask(o_mask) cfg_group = grp.getgrnam(CFG_GROUP) os.setgid(cfg_group.gr_gid) os.environ['SUDO_USER'] = 'vyos' os.environ['SUDO_GID'] = str(cfg_group.gr_gid) def sig_handler(signum, frame): shutdown() signal.signal(signal.SIGTERM, sig_handler) signal.signal(signal.SIGINT, sig_handler) # Define the vyshim environment variable remove_if_file(configd_env_file) os.symlink(configd_env_set_file, configd_env_file) config = None while True: # Wait for next request from client msg = socket.recv().decode() logger.debug(f"Received message: {msg}") message = json.loads(msg) if message["type"] == "init": resp = "init" socket.send(resp.encode()) config = initialization(socket) elif message["type"] == "node": - res = process_node_data(config, message["data"]) + if message["last"]: + logger.debug(f'final element of priority queue') + res = process_node_data(config, message["data"], message["last"]) response = res.to_bytes(1, byteorder=sys.byteorder) logger.debug(f"Sending response {res}") socket.send(response) else: logger.critical(f"Unexpected message: {message}") diff --git a/src/shim/vyshim.c b/src/shim/vyshim.c index cae8b6152..41723e7a4 100644 --- a/src/shim/vyshim.c +++ b/src/shim/vyshim.c @@ -1,301 +1,309 @@ /* - * Copyright (C) 2020 VyOS maintainers and contributors + * Copyright (C) 2020-2024 VyOS maintainers and contributors * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 or later as * published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. * */ #include <stdlib.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <string.h> #include <sys/time.h> #include <time.h> #include <stdint.h> #include <sys/types.h> #include <sys/wait.h> #include <zmq.h> #include "mkjson.h" /* * * */ #if DEBUG #define DEBUG_ON 1 #else #define DEBUG_ON 0 #endif #define debug_print(fmt, ...) \ do { if (DEBUG_ON) fprintf(stderr, fmt, ##__VA_ARGS__); } while (0) #define debug_call(f) \ do { if (DEBUG_ON) f; } while (0) #define SOCKET_PATH "ipc:///run/vyos-configd.sock" #define GET_ACTIVE "cli-shell-api --show-active-only --show-show-defaults --show-ignore-edit showConfig" #define GET_SESSION "cli-shell-api --show-working-only --show-show-defaults --show-ignore-edit showConfig" #define COMMIT_MARKER "/var/tmp/initial_in_commit" +#define QUEUE_MARKER "/var/tmp/last_in_queue" enum { SUCCESS = 1 << 0, ERROR_COMMIT = 1 << 1, ERROR_DAEMON = 1 << 2, PASS = 1 << 3 }; volatile int init_alarm = 0; volatile int timeout = 0; int initialization(void *); int pass_through(char **, int); void timer_handler(int); double get_posix_clock_time(void); int main(int argc, char* argv[]) { // string for node data: conf_mode script and tagnode, if applicable char string_node_data[256]; string_node_data[0] = '\0'; void *context = zmq_ctx_new(); void *requester = zmq_socket(context, ZMQ_REQ); int ex_index; int init_timeout = 0; + int last = 0; debug_print("Connecting to vyos-configd ...\n"); zmq_connect(requester, SOCKET_PATH); for (int i = 1; i < argc ; i++) { strncat(&string_node_data[0], argv[i], 127); } debug_print("data to send: %s\n", string_node_data); char *test = strstr(string_node_data, "VYOS_TAGNODE_VALUE"); ex_index = test ? 2 : 1; if (access(COMMIT_MARKER, F_OK) != -1) { init_timeout = initialization(requester); if (!init_timeout) remove(COMMIT_MARKER); } // if initial communication failed, pass through execution of script if (init_timeout) { int ret = pass_through(argv, ex_index); return ret; } + if (access(QUEUE_MARKER, F_OK) != -1) { + last = 1; + remove(QUEUE_MARKER); + } + char error_code[1]; debug_print("Sending node data ...\n"); - char *string_node_data_msg = mkjson(MKJSON_OBJ, 2, + char *string_node_data_msg = mkjson(MKJSON_OBJ, 3, MKJSON_STRING, "type", "node", + MKJSON_BOOL, "last", last, MKJSON_STRING, "data", &string_node_data[0]); zmq_send(requester, string_node_data_msg, strlen(string_node_data_msg), 0); zmq_recv(requester, error_code, 1, 0); debug_print("Received node data receipt\n"); int err = (int)error_code[0]; free(string_node_data_msg); zmq_close(requester); zmq_ctx_destroy(context); if (err & PASS) { debug_print("Received PASS\n"); int ret = pass_through(argv, ex_index); return ret; } if (err & ERROR_DAEMON) { debug_print("Received ERROR_DAEMON\n"); int ret = pass_through(argv, ex_index); return ret; } if (err & ERROR_COMMIT) { debug_print("Received ERROR_COMMIT\n"); return -1; } return 0; } int initialization(void* Requester) { char *active_str = NULL; size_t active_len = 0; char *session_str = NULL; size_t session_len = 0; char *empty_string = "\n"; char buffer[16]; struct sigaction sa; struct itimerval timer, none_timer; memset(&sa, 0, sizeof(sa)); sa.sa_handler = &timer_handler; sigaction(SIGALRM, &sa, NULL); timer.it_value.tv_sec = 0; timer.it_value.tv_usec = 10000; timer.it_interval.tv_sec = timer.it_interval.tv_usec = 0; none_timer.it_value.tv_sec = none_timer.it_value.tv_usec = 0; none_timer.it_interval.tv_sec = none_timer.it_interval.tv_usec = 0; double prev_time_value, time_value; double time_diff; char *pid_val = getenv("VYATTA_CONFIG_TMP"); strsep(&pid_val, "_"); debug_print("config session pid: %s\n", pid_val); debug_print("Sending init announcement\n"); char *init_announce = mkjson(MKJSON_OBJ, 1, MKJSON_STRING, "type", "init"); // check for timeout on initial contact while (!init_alarm) { debug_call(prev_time_value = get_posix_clock_time()); setitimer(ITIMER_REAL, &timer, NULL); zmq_send(Requester, init_announce, strlen(init_announce), 0); zmq_recv(Requester, buffer, 16, 0); setitimer(ITIMER_REAL, &none_timer, &timer); debug_call(time_value = get_posix_clock_time()); debug_print("Received init receipt\n"); debug_call(time_diff = time_value - prev_time_value); debug_print("time elapse %f\n", time_diff); break; } free(init_announce); if (timeout) return -1; FILE *fp_a = popen(GET_ACTIVE, "r"); getdelim(&active_str, &active_len, '\0', fp_a); int ret = pclose(fp_a); if (!ret) { debug_print("Sending active config\n"); zmq_send(Requester, active_str, active_len - 1, 0); zmq_recv(Requester, buffer, 16, 0); debug_print("Received active receipt\n"); } else { debug_print("Sending empty active config\n"); zmq_send(Requester, empty_string, 0, 0); zmq_recv(Requester, buffer, 16, 0); debug_print("Received active receipt\n"); } free(active_str); FILE *fp_s = popen(GET_SESSION, "r"); getdelim(&session_str, &session_len, '\0', fp_s); pclose(fp_s); debug_print("Sending session config\n"); zmq_send(Requester, session_str, session_len - 1, 0); zmq_recv(Requester, buffer, 16, 0); debug_print("Received session receipt\n"); free(session_str); debug_print("Sending config session pid\n"); zmq_send(Requester, pid_val, strlen(pid_val), 0); zmq_recv(Requester, buffer, 16, 0); debug_print("Received pid receipt\n"); return 0; } int pass_through(char **argv, int ex_index) { char **newargv = NULL; pid_t child_pid; newargv = &argv[ex_index]; if (ex_index > 1) { putenv(argv[ex_index - 1]); } debug_print("pass-through invoked\n"); if ((child_pid=fork()) < 0) { debug_print("fork() failed\n"); return -1; } else if (child_pid == 0) { if (-1 == execv(argv[ex_index], newargv)) { debug_print("pass_through execve failed %s: %s\n", argv[ex_index], strerror(errno)); return -1; } } else if (child_pid > 0) { int status; pid_t wait_pid = waitpid(child_pid, &status, 0); if (wait_pid < 0) { debug_print("waitpid() failed\n"); return -1; } else if (wait_pid == child_pid) { if (WIFEXITED(status)) { debug_print("child exited with code %d\n", WEXITSTATUS(status)); return WEXITSTATUS(status); } } } return 0; } void timer_handler(int signum) { debug_print("timer_handler invoked\n"); timeout = 1; init_alarm = 1; return; } #ifdef _POSIX_MONOTONIC_CLOCK double get_posix_clock_time(void) { struct timespec ts; if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) { return (double) (ts.tv_sec + ts.tv_nsec / 1000000000.0); } else { return 0; } } #else double get_posix_clock_time(void) {return (double)0;} #endif