diff --git a/Makefile b/Makefile index 6adb57930..23e060125 100644 --- a/Makefile +++ b/Makefile @@ -1,129 +1,128 @@ TMPL_DIR := templates-cfg OP_TMPL_DIR := templates-op BUILD_DIR := build DATA_DIR := data SHIM_DIR := src/shim LIBS := -lzmq CFLAGS := BUILD_ARCH := $(shell dpkg-architecture -q DEB_BUILD_ARCH) J2LINT := $(shell command -v j2lint 2> /dev/null) config_xml_src = $(wildcard interface-definitions/*.xml.in) config_xml_obj = $(config_xml_src:.xml.in=.xml) op_xml_src = $(wildcard op-mode-definitions/*.xml.in) op_xml_obj = $(op_xml_src:.xml.in=.xml) %.xml: %.xml.in @echo Generating $(BUILD_DIR)/$@ from $< mkdir -p $(BUILD_DIR)/$(dir $@) $(CURDIR)/scripts/transclude-template $< > $(BUILD_DIR)/$@ .PHONY: interface_definitions .ONESHELL: interface_definitions: $(config_xml_obj) mkdir -p $(TMPL_DIR) $(CURDIR)/scripts/override-default $(BUILD_DIR)/interface-definitions find $(BUILD_DIR)/interface-definitions -type f -name "*.xml" | xargs -I {} $(CURDIR)/scripts/build-command-templates {} $(CURDIR)/schema/interface_definition.rng $(TMPL_DIR) || exit 1 $(CURDIR)/python/vyos/xml_ref/generate_cache.py --xml-dir $(BUILD_DIR)/interface-definitions || exit 1 # XXX: delete top level node.def's that now live in other packages # IPSec VPN EAP-RADIUS does not support source-address rm -rf $(TMPL_DIR)/vpn/ipsec/remote-access/radius/source-address # T2472 - EIGRP support rm -rf $(TMPL_DIR)/protocols/eigrp # T2773 - EIGRP support for VRF rm -rf $(TMPL_DIR)/vrf/name/node.tag/protocols/eigrp # XXX: test if there are empty node.def files - this is not allowed as these # could mask help strings or mandatory priority statements find $(TMPL_DIR) -name node.def -type f -empty -exec false {} + || sh -c 'echo "There are empty node.def files! Check your interface definitions." && exit 1' ifeq ($(BUILD_ARCH),arm64) # There is currently no telegraf support in VyOS for ARM64, remove CLI definitions rm -rf $(TMPL_DIR)/service/monitoring/telegraf endif .PHONY: op_mode_definitions .ONESHELL: op_mode_definitions: $(op_xml_obj) mkdir -p $(OP_TMPL_DIR) find $(BUILD_DIR)/op-mode-definitions/ -type f -name "*.xml" | xargs -I {} $(CURDIR)/scripts/build-command-op-templates {} $(CURDIR)/schema/op-mode-definition.rng $(OP_TMPL_DIR) || exit 1 # XXX: delete top level op mode node.def's that now live in other packages rm -f $(OP_TMPL_DIR)/add/node.def rm -f $(OP_TMPL_DIR)/clear/interfaces/node.def rm -f $(OP_TMPL_DIR)/clear/node.def rm -f $(OP_TMPL_DIR)/delete/node.def - rm -f $(OP_TMPL_DIR)/set/node.def # XXX: ping, traceroute and mtr must be able to recursivly call themselves as the # options are provided from the scripts themselves ln -s ../node.tag $(OP_TMPL_DIR)/ping/node.tag/node.tag/ ln -s ../node.tag $(OP_TMPL_DIR)/traceroute/node.tag/node.tag/ ln -s ../node.tag $(OP_TMPL_DIR)/mtr/node.tag/node.tag/ # XXX: test if there are empty node.def files - this is not allowed as these # could mask help strings or mandatory priority statements find $(OP_TMPL_DIR) -name node.def -type f -empty -exec false {} + || sh -c 'echo "There are empty node.def files! Check your interface definitions." && exit 1' .PHONY: vyshim vyshim: $(MAKE) -C $(SHIM_DIR) .PHONY: all all: clean interface_definitions op_mode_definitions check test j2lint vyshim .PHONY: check .ONESHELL: check: @echo "Checking which CLI scripts are not enabled to work with vyos-configd..." @for file in `ls src/conf_mode -I__pycache__` do if ! grep -q $$file data/configd-include.json; then echo "* $$file" fi done .PHONY: clean clean: rm -rf $(BUILD_DIR) rm -rf $(TMPL_DIR) rm -rf $(OP_TMPL_DIR) $(MAKE) -C $(SHIM_DIR) clean .PHONY: test test: set -e; python3 -m compileall -q -x '/vmware-tools/scripts/, /ppp/' . PYTHONPATH=python/ python3 -m "nose" --with-xunit src --with-coverage --cover-erase --cover-xml --cover-package src/conf_mode,src/op_mode,src/completion,src/helpers,src/validators,src/tests --verbose .PHONY: j2lint j2lint: ifndef J2LINT $(error "j2lint binary not found, consider installing: pip install git+https://github.com/aristanetworks/j2lint.git@341b5d5db86") endif $(J2LINT) data/ .PHONY: sonar sonar: sonar-scanner -X -Dsonar.login=${SONAR_TOKEN} .PHONY: docs .ONESHELL: docs: sphinx-apidoc -o sphinx/source/ python/ cd sphinx/ PYTHONPATH=../python make html deb: dpkg-buildpackage -uc -us -tc -b .PHONY: schema schema: trang -I rnc -O rng schema/interface_definition.rnc schema/interface_definition.rng trang -I rnc -O rng schema/op-mode-definition.rnc schema/op-mode-definition.rng diff --git a/data/templates/grub/grub_common.j2 b/data/templates/grub/grub_common.j2 new file mode 100644 index 000000000..278ffbf2c --- /dev/null +++ b/data/templates/grub/grub_common.j2 @@ -0,0 +1,23 @@ +# load EFI video modules +if [ "${grub_platform}" == "efi" ]; then + insmod efi_gop + insmod efi_uga +fi + +# create and activate serial console +function setup_serial { + # initialize the first serial port by default + if [ "${console_type}" == "ttyS" ]; then + serial --unit=${console_num} + else + serial --unit=0 + fi + terminal_output --append serial console + terminal_input --append serial console +} + +setup_serial + +{% if search_root %} +{{ search_root }} +{% endif %} diff --git a/data/templates/grub/grub_compat.j2 b/data/templates/grub/grub_compat.j2 new file mode 100644 index 000000000..887d5d0bd --- /dev/null +++ b/data/templates/grub/grub_compat.j2 @@ -0,0 +1,63 @@ +{# j2lint: disable=S6 #} +### Generated by VyOS image-tools v.{{ tools_version }} ### +{% macro menu_name(mode) -%} +{% if mode == 'normal' -%} + VyOS +{%- elif mode == 'pw_reset' -%} + Lost password change +{%- else -%} + Unknown +{%- endif %} +{%- endmacro %} +{% macro console_name(type) -%} +{% if type == 'tty' -%} + KVM +{%- elif type == 'ttyS' -%} + Serial +{%- elif type == 'ttyUSB' -%} + USB +{%- else -%} + Unknown +{%- endif %} +{%- endmacro %} +{% macro console_opts(type) -%} +{% if type == 'tty' -%} + console=ttyS0,115200 console=tty0 +{%- elif type == 'ttyS' -%} + console=tty0 console=ttyS0,115200 +{%- elif type == 'ttyUSB' -%} + console=tty0 console=ttyUSB0,115200 +{%- else -%} + console=tty0 console=ttyS0,115200 +{%- endif %} +{%- endmacro %} +{% macro passwd_opts(mode) -%} +{% if mode == 'pw_reset' -%} + init=/opt/vyatta/sbin/standalone_root_pw_reset +{%- endif %} +{%- endmacro %} +set default={{ default }} +set timeout={{ timeout }} +{% if console_type == 'ttyS' %} +serial --unit={{ console_num }} --speed=115200 +{% else %} +serial --unit=0 --speed=115200 +{% endif %} +terminal_output --append serial +terminal_input serial console +{% for mod in modules %} +insmod {{ mod }} +{% endfor %} +{% if root %} +set root={{ root }} +{% endif %} +{% if search_root %} +{{ search_root }} +{% endif %} + +{% for v in versions %} +menuentry "{{ menu_name(v.bootmode) }} {{ v.version }} ({{ console_name(v.console_type) }} console)" { + linux /boot/{{ v.version }}/vmlinuz {{ v.boot_opts }} {{ console_opts(v.console_type) }} {{ passwd_opts(v.bootmode) }} + initrd /boot/{{ v.version }}/initrd.img +} +{% endfor %} diff --git a/data/templates/grub/grub_main.j2 b/data/templates/grub/grub_main.j2 new file mode 100644 index 000000000..0c7ea0202 --- /dev/null +++ b/data/templates/grub/grub_main.j2 @@ -0,0 +1,7 @@ +load_env +insmod regexp + +for cfgfile in ${prefix}/grub.cfg.d/*-autoload.cfg +do + source ${cfgfile} +done diff --git a/data/templates/grub/grub_menu.j2 b/data/templates/grub/grub_menu.j2 new file mode 100644 index 000000000..e73005f5d --- /dev/null +++ b/data/templates/grub/grub_menu.j2 @@ -0,0 +1,5 @@ +for cfgfile in ${config_directory}/vyos-versions/*.cfg +do + source "${cfgfile}" +done +source ${config_directory}/50-vyos-options.cfg diff --git a/data/templates/grub/grub_modules.j2 b/data/templates/grub/grub_modules.j2 new file mode 100644 index 000000000..24b540c9d --- /dev/null +++ b/data/templates/grub/grub_modules.j2 @@ -0,0 +1,3 @@ +{% for mod_name in mods_list %} +insmod {{ mod_name | e }} +{% endfor %} diff --git a/data/templates/grub/grub_options.j2 b/data/templates/grub/grub_options.j2 new file mode 100644 index 000000000..c8a1472e1 --- /dev/null +++ b/data/templates/grub/grub_options.j2 @@ -0,0 +1,52 @@ +submenu "Boot options" { + submenu "Select boot mode" { + menuentry "Normal" { + set bootmode="normal" + export bootmode + configfile ${prefix}/grub.cfg.d/*vyos-menu*.cfg + } + menuentry "Password reset" { + set bootmode="pw_reset" + export bootmode + configfile ${prefix}/grub.cfg.d/*vyos-menu*.cfg + } + menuentry "System recovery" { + set bootmode="recovery" + export bootmode + configfile ${prefix}/grub.cfg.d/*vyos-menu*.cfg + } + menuentry "Load the whole root filesystem to RAM" { + set boot_toram="yes" + export boot_toram + configfile ${prefix}/grub.cfg.d/*vyos-menu*.cfg + } + } + submenu "Select console type" { + menuentry "tty (graphical)" { + set console_type="tty" + export console_type + configfile ${prefix}/grub.cfg.d/*vyos-menu*.cfg + } + menuentry "ttyS (serial)" { + set console_type="ttyS" + export console_type + setup_serial + configfile ${prefix}/grub.cfg.d/*vyos-menu*.cfg + } + menuentry "ttyUSB (USB serial)" { + set console_type="ttyUSB" + export console_type + setup_serial + configfile ${prefix}/grub.cfg.d/*vyos-menu*.cfg + } + } + menuentry "Enter console number" { + read console_num + export console_num + setup_serial + configfile ${prefix}/grub.cfg.d/*vyos-menu*.cfg + } + menuentry "Current: boot mode: ${bootmode}, console: ${console_type}${console_num}" { + echo + } +} diff --git a/data/templates/grub/grub_vars.j2 b/data/templates/grub/grub_vars.j2 new file mode 100644 index 000000000..e0002e8d8 --- /dev/null +++ b/data/templates/grub/grub_vars.j2 @@ -0,0 +1,4 @@ +{% for var_name, var_value in vars.items() %} +set {{ var_name | e }}="{{ var_value | e }}" +export {{ var_name | e }} +{% endfor %} diff --git a/data/templates/grub/grub_vyos_version.j2 b/data/templates/grub/grub_vyos_version.j2 new file mode 100644 index 000000000..97fbe8473 --- /dev/null +++ b/data/templates/grub/grub_vyos_version.j2 @@ -0,0 +1,22 @@ +{% set boot_opts_default = "boot=live rootdelay=5 noautologin net.ifnames=0 biosdevname=0 vyos-union=/boot/" + version_name %} +{% if boot_opts != '' %} +{% set boot_opts_rendered = boot_opts %} +{% else %} +{% set boot_opts_rendered = boot_opts_default %} +{% endif %} +menuentry "{{ version_name }}" --id {{ version_uuid }} { + set boot_opts="{{ boot_opts_rendered }}" + # load rootfs to RAM + if [ "${boot_toram}" == "yes" ]; then + set boot_opts="${boot_opts} toram" + fi + if [ "${bootmode}" == "pw_reset" ]; then + set boot_opts="${boot_opts} console=${console_type}${console_num} init=/usr/libexec/vyos/system/standalone_root_pw_reset" + elif [ "${bootmode}" == "recovery" ]; then + set boot_opts="${boot_opts} console=${console_type}${console_num} init=/usr/bin/busybox init" + else + set boot_opts="${boot_opts} console=${console_type}${console_num}" + fi + linux "/boot/{{ version_name }}/vmlinuz" ${boot_opts} + initrd "/boot/{{ version_name }}/initrd.img" +} diff --git a/debian/vyos-1x.links b/debian/vyos-1x.links index 0e2d1b841..402c91306 100644 --- a/debian/vyos-1x.links +++ b/debian/vyos-1x.links @@ -1 +1,2 @@ /etc/netplug/linkup.d/vyos-python-helper /etc/netplug/linkdown.d/vyos-python-helper +/usr/libexec/vyos/system/standalone_root_pw_reset /opt/vyatta/sbin/standalone_root_pw_reset diff --git a/op-mode-definitions/add-system-image.xml.in b/op-mode-definitions/add-system-image.xml.in deleted file mode 100644 index 67d8aa3b4..000000000 --- a/op-mode-definitions/add-system-image.xml.in +++ /dev/null @@ -1,62 +0,0 @@ -<?xml version="1.0"?> -<interfaceDefinition> - <node name="add"> - <children> - <node name="system"> - <properties> - <help>Add item to a system facility</help> - </properties> - <children> - <tagNode name="image"> - <properties> - <help>Add a new image to the system</help> - <completionHelp> - <list>/path/to/vyos-image.iso "http://example.com/vyos-image.iso"</list> - </completionHelp> - </properties> - <command>sudo ${vyatta_sbindir}/install-image --url "${4}"</command> - <children> - <tagNode name="vrf"> - <properties> - <help>Download image via specified VRF</help> - <completionHelp> - <path>vrf name</path> - </completionHelp> - </properties> - <command>sudo ${vyatta_sbindir}/install-image --url "${4}" --vrf "${6}"</command> - <children> - <tagNode name="username"> - <properties> - <help>Username for authentication</help> - </properties> - <children> - <tagNode name="password"> - <properties> - <help>Password to use with authentication</help> - </properties> - <command>sudo ${vyatta_sbindir}/install-image --url "${4}" --vrf "${6}" --username "${8}" --password "${10}"</command> - </tagNode> - </children> - </tagNode> - </children> - </tagNode> - <tagNode name="username"> - <properties> - <help>Username for authentication</help> - </properties> - <children> - <tagNode name="password"> - <properties> - <help>Password to use with authentication</help> - </properties> - <command>sudo ${vyatta_sbindir}/install-image --url "${4}" --username "${6}" --password "${8}"</command> - </tagNode> - </children> - </tagNode> - </children> - </tagNode> - </children> - </node> - </children> - </node> -</interfaceDefinition> diff --git a/op-mode-definitions/system-image.xml.in b/op-mode-definitions/system-image.xml.in new file mode 100644 index 000000000..c131087be --- /dev/null +++ b/op-mode-definitions/system-image.xml.in @@ -0,0 +1,201 @@ +<?xml version="1.0" encoding="UTF-8"?> +<interfaceDefinition> + <node name="add"> + <properties> + <help>Add an object</help> + </properties> + <children> + <node name="system"> + <properties> + <help>Add item to a system facility</help> + </properties> + <children> + <tagNode name="image"> + <properties> + <help>Add a new image to the system</help> + <completionHelp> + <list>/path/to/vyos-image.iso "http://example.com/vyos-image.iso"</list> + </completionHelp> + </properties> + <command>sudo ${vyos_op_scripts_dir}/image_installer.py --action add --image-path "${4}"</command> + <children> + <tagNode name="vrf"> + <properties> + <help>Download image via specified VRF</help> + <completionHelp> + <path>vrf name</path> + </completionHelp> + </properties> + <command>sudo ${vyos_op_scripts_dir}/image_installer.py --action add --image-path "${4}" --vrf "${6}"</command> + <children> + <tagNode name="username"> + <properties> + <help>Username for authentication</help> + </properties> + <children> + <tagNode name="password"> + <properties> + <help>Password to use with authentication</help> + </properties> + <command>sudo ${vyos_op_scripts_dir}/image_installer.py --action add --image-path "${4}" --vrf "${6}" --username "${8}" --password "${10}"</command> + </tagNode> + </children> + </tagNode> + </children> + </tagNode> + <tagNode name="username"> + <properties> + <help>Username for authentication</help> + </properties> + <children> + <tagNode name="password"> + <properties> + <help>Password to use with authentication</help> + </properties> + <command>sudo ${vyos_op_scripts_dir}/image_installer.py --action add --image-path "${4}" --username "${6}" --password "${8}"</command> + </tagNode> + </children> + </tagNode> + </children> + </tagNode> + </children> + </node> + </children> + </node> + <node name="set"> + <properties> + <help>Install a new system</help> + </properties> + <children> + <node name="system"> + <properties> + <help>Set system operational parameters</help> + </properties> + <children> + <node name="image"> + <properties> + <help>Set system image parameters</help> + </properties> + <children> + <node name="default-boot"> + <properties> + <help>Set default image to boot.</help> + </properties> + <command>sudo ${vyos_op_scripts_dir}/image_manager.py --action set</command> + </node> + <tagNode name="default-boot"> + <properties> + <help>Set default image to boot.</help> + <completionHelp> + <script>sudo ${vyos_op_scripts_dir}/image_manager.py --action list</script> + </completionHelp> + </properties> + <command>sudo ${vyos_op_scripts_dir}/image_manager.py --action set --image-name "${5}"</command> + </tagNode> + </children> + </node> + </children> + </node> + </children> + </node> + <node name="install"> + <properties> + <help>Install a new system</help> + </properties> + <children> + <node name="image"> + <properties> + <help>Install new system image to hard drive</help> + </properties> + <command>sudo ${vyos_op_scripts_dir}/image_installer.py --action install</command> + </node> + </children> + </node> + <node name="delete"> + <properties> + <help>Delete an object</help> + </properties> + <children> + <node name="system"> + <properties> + <help>Delete system objects</help> + </properties> + <children> + <node name="image"> + <properties> + <help>Remove an installed image from the system</help> + </properties> + <command>sudo ${vyos_op_scripts_dir}/image_manager.py --action delete</command> + </node> + <tagNode name="image"> + <properties> + <help>Remove an installed image from the system</help> + <completionHelp> + <script>sudo ${vyos_op_scripts_dir}/image_manager.py --action list</script> + </completionHelp> + </properties> + <command>sudo ${vyos_op_scripts_dir}/image_manager.py --action delete --image-name "${4}"</command> + </tagNode> + </children> + </node> + </children> + </node> + <node name="rename"> + <properties> + <help>Rename an object</help> + </properties> + <children> + <node name="system"> + <properties> + <help>Rename a system object</help> + </properties> + <children> + <tagNode name="image"> + <properties> + <help>System image to rename</help> + <completionHelp> + <script>sudo ${vyos_op_scripts_dir}/image_manager.py --action list</script> + </completionHelp> + </properties> + <children> + <tagNode name="to"> + <properties> + <help>A new name for an image</help> + </properties> + <command>sudo ${vyos_op_scripts_dir}/image_manager.py --action rename --image-name "${4}" --image-new-name "${6}"</command> + </tagNode> + </children> + </tagNode> + </children> + </node> + </children> + </node> + <node name="show"> + <properties> + <help>Rename an object</help> + </properties> + <children> + <node name="system"> + <properties> + <help>Show system information</help> + </properties> + <children> + <node name="image"> + <properties> + <help>Show installed VyOS images</help> + </properties> + <command>sudo ${vyos_op_scripts_dir}/image_info.py show_images_summary</command> + <children> + <node name="details"> + <properties> + <help>Show details about installed VyOS images</help> + </properties> + <command>sudo ${vyos_op_scripts_dir}/image_info.py show_images_details</command> + </node> + </children> + </node> + </children> + </node> + </children> + </node> +</interfaceDefinition> diff --git a/python/vyos/configsession.py b/python/vyos/configsession.py index 9802ebae4..90842b749 100644 --- a/python/vyos/configsession.py +++ b/python/vyos/configsession.py @@ -1,247 +1,249 @@ # configsession -- the write API for the VyOS running config # Copyright (C) 2019-2023 VyOS maintainers and contributors # # This library is free software; you can redistribute it and/or modify it under the terms of # the GNU Lesser General Public License as published by the Free Software Foundation; # either version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; # without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. # See the GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License along with this library; # if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import os import re import sys import subprocess from vyos.utils.process import is_systemd_service_running from vyos.utils.dict import dict_to_paths CLI_SHELL_API = '/bin/cli-shell-api' SET = '/opt/vyatta/sbin/my_set' DELETE = '/opt/vyatta/sbin/my_delete' COMMENT = '/opt/vyatta/sbin/my_comment' COMMIT = '/opt/vyatta/sbin/my_commit' DISCARD = '/opt/vyatta/sbin/my_discard' SHOW_CONFIG = ['/bin/cli-shell-api', 'showConfig'] LOAD_CONFIG = ['/bin/cli-shell-api', 'loadFile'] MIGRATE_LOAD_CONFIG = ['/usr/libexec/vyos/vyos-load-config.py'] SAVE_CONFIG = ['/usr/libexec/vyos/vyos-save-config.py'] -INSTALL_IMAGE = ['/opt/vyatta/sbin/install-image', '--url'] -REMOVE_IMAGE = ['/opt/vyatta/bin/vyatta-boot-image.pl', '--del'] +INSTALL_IMAGE = ['/usr/libexec/vyos/op_mode/image_installer.py', + '--action', 'add', '--no-prompt', '--image-path'] +REMOVE_IMAGE = ['/usr/libexec/vyos/op_mode/image_manager.py', + '--action', 'delete', '--no-prompt', '--image-name'] GENERATE = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'generate'] SHOW = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'show'] RESET = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'reset'] REBOOT = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'reboot'] POWEROFF = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'poweroff'] OP_CMD_ADD = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'add'] OP_CMD_DELETE = ['/opt/vyatta/bin/vyatta-op-cmd-wrapper', 'delete'] # Default "commit via" string APP = "vyos-http-api" # When started as a service rather than from a user shell, # the process lacks the VyOS-specific environment that comes # from bash configs, so we have to inject it # XXX: maybe it's better to do via a systemd environment file def inject_vyos_env(env): env['VYATTA_CFG_GROUP_NAME'] = 'vyattacfg' env['VYATTA_USER_LEVEL_DIR'] = '/opt/vyatta/etc/shell/level/admin' env['VYATTA_PROCESS_CLIENT'] = 'gui2_rest' env['VYOS_HEADLESS_CLIENT'] = 'vyos_http_api' env['vyatta_bindir']= '/opt/vyatta/bin' env['vyatta_cfg_templates'] = '/opt/vyatta/share/vyatta-cfg/templates' env['vyatta_configdir'] = '/opt/vyatta/config' env['vyatta_datadir'] = '/opt/vyatta/share' env['vyatta_datarootdir'] = '/opt/vyatta/share' env['vyatta_libdir'] = '/opt/vyatta/lib' env['vyatta_libexecdir'] = '/opt/vyatta/libexec' env['vyatta_op_templates'] = '/opt/vyatta/share/vyatta-op/templates' env['vyatta_prefix'] = '/opt/vyatta' env['vyatta_sbindir'] = '/opt/vyatta/sbin' env['vyatta_sysconfdir'] = '/opt/vyatta/etc' env['vyos_bin_dir'] = '/usr/bin' env['vyos_cfg_templates'] = '/opt/vyatta/share/vyatta-cfg/templates' env['vyos_completion_dir'] = '/usr/libexec/vyos/completion' env['vyos_configdir'] = '/opt/vyatta/config' env['vyos_conf_scripts_dir'] = '/usr/libexec/vyos/conf_mode' env['vyos_datadir'] = '/opt/vyatta/share' env['vyos_datarootdir']= '/opt/vyatta/share' env['vyos_libdir'] = '/opt/vyatta/lib' env['vyos_libexec_dir'] = '/usr/libexec/vyos' env['vyos_op_scripts_dir'] = '/usr/libexec/vyos/op_mode' env['vyos_op_templates'] = '/opt/vyatta/share/vyatta-op/templates' env['vyos_prefix'] = '/opt/vyatta' env['vyos_sbin_dir'] = '/usr/sbin' env['vyos_validators_dir'] = '/usr/libexec/vyos/validators' # if running the vyos-configd daemon, inject the vyshim env var if is_systemd_service_running('vyos-configd.service'): env['vyshim'] = '/usr/sbin/vyshim' return env class ConfigSessionError(Exception): pass class ConfigSession(object): """ The write API of VyOS. """ def __init__(self, session_id, app=APP): """ Creates a new config session. Args: session_id (str): Session identifier app (str): Application name, purely informational Note: The session identifier MUST be globally unique within the system. The best practice is to only have one ConfigSession object per process and used the PID for the session identifier. """ env_str = subprocess.check_output([CLI_SHELL_API, 'getSessionEnv', str(session_id)]) self.__session_id = session_id # Extract actual variables from the chunk of shell it outputs # XXX: it's better to extend cli-shell-api to provide easily readable output env_list = re.findall(r'([A-Z_]+)=([^;\s]+)', env_str.decode()) session_env = os.environ session_env = inject_vyos_env(session_env) for k, v in env_list: session_env[k] = v self.__session_env = session_env self.__session_env["COMMIT_VIA"] = app self.__run_command([CLI_SHELL_API, 'setupSession']) def __del__(self): try: output = subprocess.check_output([CLI_SHELL_API, 'teardownSession'], env=self.__session_env).decode().strip() if output: print("cli-shell-api teardownSession output for sesion {0}: {1}".format(self.__session_id, output), file=sys.stderr) except Exception as e: print("Could not tear down session {0}: {1}".format(self.__session_id, e), file=sys.stderr) def __run_command(self, cmd_list): p = subprocess.Popen(cmd_list, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=self.__session_env) (stdout_data, stderr_data) = p.communicate() output = stdout_data.decode() result = p.wait() if result != 0: raise ConfigSessionError(output) return output def get_session_env(self): return self.__session_env def set(self, path, value=None): if not value: value = [] else: value = [value] self.__run_command([SET] + path + value) def set_section(self, path: list, d: dict): try: for p in dict_to_paths(d): self.set(path + p) except (ValueError, ConfigSessionError) as e: raise ConfigSessionError(e) def delete(self, path, value=None): if not value: value = [] else: value = [value] self.__run_command([DELETE] + path + value) def load_section(self, path: list, d: dict): try: self.delete(path) if d: for p in dict_to_paths(d): self.set(path + p) except (ValueError, ConfigSessionError) as e: raise ConfigSessionError(e) def comment(self, path, value=None): if not value: value = [""] else: value = [value] self.__run_command([COMMENT] + path + value) def commit(self): out = self.__run_command([COMMIT]) return out def discard(self): self.__run_command([DISCARD]) def show_config(self, path, format='raw'): config_data = self.__run_command(SHOW_CONFIG + path) if format == 'raw': return config_data def load_config(self, file_path): out = self.__run_command(LOAD_CONFIG + [file_path]) return out def migrate_and_load_config(self, file_path): out = self.__run_command(MIGRATE_LOAD_CONFIG + [file_path]) return out def save_config(self, file_path): out = self.__run_command(SAVE_CONFIG + [file_path]) return out def install_image(self, url): out = self.__run_command(INSTALL_IMAGE + [url]) return out def remove_image(self, name): out = self.__run_command(REMOVE_IMAGE + [name]) return out def generate(self, path): out = self.__run_command(GENERATE + path) return out def show(self, path): out = self.__run_command(SHOW + path) return out def reboot(self, path): out = self.__run_command(REBOOT + path) return out def reset(self, path): out = self.__run_command(RESET + path) return out def poweroff(self, path): out = self.__run_command(POWEROFF + path) return out def add_container_image(self, name): out = self.__run_command(OP_CMD_ADD + ['container', 'image'] + [name]) return out def delete_container_image(self, name): out = self.__run_command(OP_CMD_DELETE + ['container', 'image'] + [name]) return out def show_container_image(self): out = self.__run_command(SHOW + ['container', 'image']) return out diff --git a/python/vyos/remote.py b/python/vyos/remote.py index f86d2ef35..b1efcd10b 100644 --- a/python/vyos/remote.py +++ b/python/vyos/remote.py @@ -1,469 +1,471 @@ # Copyright 2021 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. import os import pwd import shutil import socket import ssl import stat import sys import tempfile import urllib.parse from contextlib import contextmanager from pathlib import Path from ftplib import FTP from ftplib import FTP_TLS from paramiko import SSHClient, SSHException from paramiko import MissingHostKeyPolicy from requests import Session from requests.adapters import HTTPAdapter from requests.packages.urllib3 import PoolManager from vyos.progressbar import Progressbar from vyos.utils.io import ask_yes_no from vyos.utils.io import is_interactive from vyos.utils.io import print_error from vyos.utils.misc import begin from vyos.utils.process import cmd, rc_cmd from vyos.version import get_version CHUNK_SIZE = 8192 class InteractivePolicy(MissingHostKeyPolicy): """ Paramiko policy for interactively querying the user on whether to proceed with SSH connections to unknown hosts. """ def missing_host_key(self, client, hostname, key): print_error(f"Host '{hostname}' not found in known hosts.") print_error('Fingerprint: ' + key.get_fingerprint().hex()) if is_interactive() and ask_yes_no('Do you wish to continue?'): if client._host_keys_filename\ and ask_yes_no('Do you wish to permanently add this host/key pair to known hosts?'): client._host_keys.add(hostname, key.get_name(), key) client.save_host_keys(client._host_keys_filename) else: raise SSHException(f"Cannot connect to unknown host '{hostname}'.") class SourceAdapter(HTTPAdapter): """ urllib3 transport adapter for setting source addresses per session. """ def __init__(self, source_pair, *args, **kwargs): # A source pair is a tuple of a source host string and source port respectively. # Supply '' and 0 respectively for default values. self._source_pair = source_pair super(SourceAdapter, self).__init__(*args, **kwargs) def init_poolmanager(self, connections, maxsize, block=False): self.poolmanager = PoolManager( num_pools=connections, maxsize=maxsize, block=block, source_address=self._source_pair) @contextmanager def umask(mask: int): """ Context manager that temporarily sets the process umask. """ import os oldmask = os.umask(mask) try: yield finally: os.umask(oldmask) def check_storage(path, size): """ Check whether `path` has enough storage space for a transfer of `size` bytes. """ path = os.path.abspath(os.path.expanduser(path)) directory = path if os.path.isdir(path) else (os.path.dirname(os.path.expanduser(path)) or os.getcwd()) # `size` can be None or 0 to indicate unknown size. if not size: print_error('Warning: Cannot determine size of remote file. Bravely continuing regardless.') return if size < 1024 * 1024: print_error(f'The file is {size / 1024.0:.3f} KiB.') else: print_error(f'The file is {size / (1024.0 * 1024.0):.3f} MiB.') # Will throw `FileNotFoundError' if `directory' is absent. if size > shutil.disk_usage(directory).free: raise OSError(f'Not enough disk space available in "{directory}".') class FtpC: def __init__(self, url, progressbar=False, check_space=False, source_host='', source_port=0, timeout=10): self.secure = url.scheme == 'ftps' self.hostname = url.hostname self.path = url.path self.username = url.username or os.getenv('REMOTE_USERNAME', 'anonymous') self.password = url.password or os.getenv('REMOTE_PASSWORD', '') self.port = url.port or 21 self.source = (source_host, source_port) self.progressbar = progressbar self.check_space = check_space self.timeout = timeout def _establish(self): if self.secure: return FTP_TLS(source_address=self.source, context=ssl.create_default_context(), timeout=self.timeout) else: return FTP(source_address=self.source, timeout=self.timeout) def download(self, location: str): # Open the file upfront before establishing connection. with open(location, 'wb') as f, self._establish() as conn: conn.connect(self.hostname, self.port) conn.login(self.username, self.password) # Set secure connection over TLS. if self.secure: conn.prot_p() # Almost all FTP servers support the `SIZE' command. size = conn.size(self.path) if self.check_space: check_storage(path, size) # No progressbar if we can't determine the size or if the file is too small. if self.progressbar and size and size > CHUNK_SIZE: with Progressbar(CHUNK_SIZE / size) as p: callback = lambda block: begin(f.write(block), p.increment()) conn.retrbinary('RETR ' + self.path, callback, CHUNK_SIZE) else: conn.retrbinary('RETR ' + self.path, f.write, CHUNK_SIZE) def upload(self, location: str): size = os.path.getsize(location) with open(location, 'rb') as f, self._establish() as conn: conn.connect(self.hostname, self.port) conn.login(self.username, self.password) if self.secure: conn.prot_p() if self.progressbar and size and size > CHUNK_SIZE: with Progressbar(CHUNK_SIZE / size) as p: conn.storbinary('STOR ' + self.path, f, CHUNK_SIZE, lambda block: p.increment()) else: conn.storbinary('STOR ' + self.path, f, CHUNK_SIZE) class SshC: known_hosts = os.path.expanduser('~/.ssh/known_hosts') def __init__(self, url, progressbar=False, check_space=False, source_host='', source_port=0, timeout=10.0): self.hostname = url.hostname self.path = url.path self.username = url.username or os.getenv('REMOTE_USERNAME') self.password = url.password or os.getenv('REMOTE_PASSWORD') self.port = url.port or 22 self.source = (source_host, source_port) self.progressbar = progressbar self.check_space = check_space self.timeout = timeout def _establish(self): ssh = SSHClient() ssh.load_system_host_keys() # Try to load from a user-local known hosts file if one exists. if os.path.exists(self.known_hosts): ssh.load_host_keys(self.known_hosts) ssh.set_missing_host_key_policy(InteractivePolicy()) # `socket.create_connection()` automatically picks a NIC and an IPv4/IPv6 address family # for us on dual-stack systems. sock = socket.create_connection((self.hostname, self.port), self.timeout, self.source) ssh.connect(self.hostname, self.port, self.username, self.password, sock=sock) return ssh def download(self, location: str): with self._establish() as ssh, ssh.open_sftp() as sftp: if self.check_space: check_storage(location, sftp.stat(self.path).st_size) if self.progressbar: with Progressbar() as p: sftp.get(self.path, location, callback=p.progress) else: sftp.get(self.path, location) def upload(self, location: str): with self._establish() as ssh, ssh.open_sftp() as sftp: try: # If the remote path is a directory, use the original filename. if stat.S_ISDIR(sftp.stat(self.path).st_mode): path = os.path.join(self.path, os.path.basename(location)) # A file exists at this destination. We're simply going to clobber it. else: path = self.path # This path doesn't point at any existing file. We can freely use this filename. except IOError: path = self.path finally: if self.progressbar: with Progressbar() as p: sftp.put(location, path, callback=p.progress) else: sftp.put(location, path) class HttpC: def __init__(self, url, progressbar=False, check_space=False, source_host='', source_port=0, timeout=10.0): self.urlstring = urllib.parse.urlunsplit(url) self.progressbar = progressbar self.check_space = check_space self.source_pair = (source_host, source_port) self.username = url.username or os.getenv('REMOTE_USERNAME') self.password = url.password or os.getenv('REMOTE_PASSWORD') self.timeout = timeout def _establish(self): session = Session() session.mount(self.urlstring, SourceAdapter(self.source_pair)) session.headers.update({'User-Agent': 'VyOS/' + get_version()}) if self.username: session.auth = self.username, self.password return session def download(self, location: str): with self._establish() as s: # We ask for uncompressed downloads so that we don't have to deal with decoding. # Not only would it potentially mess up with the progress bar but # `shutil.copyfileobj(request.raw, file)` does not handle automatic decoding. s.headers.update({'Accept-Encoding': 'identity'}) with s.head(self.urlstring, allow_redirects=True, timeout=self.timeout) as r: # Abort early if the destination is inaccessible. r.raise_for_status() # If the request got redirected, keep the last URL we ended up with. final_urlstring = r.url if r.history and self.progressbar: print_error('Redirecting to ' + final_urlstring) # Check for the prospective file size. try: size = int(r.headers['Content-Length']) # In case the server does not supply the header. except KeyError: size = None if self.check_space: check_storage(location, size) with s.get(final_urlstring, stream=True, timeout=self.timeout) as r, open(location, 'wb') as f: if self.progressbar and size: with Progressbar(CHUNK_SIZE / size) as p: for chunk in iter(lambda: begin(p.increment(), r.raw.read(CHUNK_SIZE)), b''): f.write(chunk) else: # We'll try to stream the download directly with `copyfileobj()` so that large # files (like entire VyOS images) don't occupy much memory. shutil.copyfileobj(r.raw, f) def upload(self, location: str): # Does not yet support progressbars. with self._establish() as s, open(location, 'rb') as f: s.post(self.urlstring, data=f, allow_redirects=True, timeout=self.timeout) class TftpC: # We simply allow `curl` to take over because # 1. TFTP is rather simple. # 2. Since there's no concept authentication, we don't need to deal with keys/passwords. # 3. It would be a waste to import, audit and maintain a third-party library for TFTP. # 4. I'd rather not implement the entire protocol here, no matter how simple it is. def __init__(self, url, progressbar=False, check_space=False, source_host=None, source_port=0, timeout=10): source_option = f'--interface {source_host} --local-port {source_port}' if source_host else '' progress_flag = '--progress-bar' if progressbar else '-s' self.command = f'curl {source_option} {progress_flag} --connect-timeout {timeout}' self.urlstring = urllib.parse.urlunsplit(url) def download(self, location: str): with open(location, 'wb') as f: f.write(cmd(f'{self.command} "{self.urlstring}"').encode()) def upload(self, location: str): with open(location, 'rb') as f: cmd(f'{self.command} -T - "{self.urlstring}"', input=f.read()) class GitC: def __init__(self, url, progressbar=False, check_space=False, source_host=None, source_port=0, timeout=10, ): self.command = 'git' self.url = url self.urlstring = urllib.parse.urlunsplit(url) if self.urlstring.startswith("git+"): self.urlstring = self.urlstring.replace("git+", "", 1) def download(self, location: str): raise NotImplementedError("not supported") @umask(0o077) def upload(self, location: str): scheme = self.url.scheme _, _, scheme = scheme.partition("+") netloc = self.url.netloc url = Path(self.url.path).parent with tempfile.TemporaryDirectory(prefix="git-commit-archive-") as directory: # Determine username, fullname, email for Git commit pwd_entry = pwd.getpwuid(os.getuid()) user = pwd_entry.pw_name name = pwd_entry.pw_gecos.split(",")[0] or user fqdn = socket.getfqdn() email = f"{user}@{fqdn}" # environment vars for our git commands env = { "GIT_TERMINAL_PROMPT": "0", "GIT_AUTHOR_NAME": name, "GIT_AUTHOR_EMAIL": email, "GIT_COMMITTER_NAME": name, "GIT_COMMITTER_EMAIL": email, } # build ssh command for git ssh_command = ["ssh"] # if we are not interactive, we use StrictHostKeyChecking=yes to avoid any prompts if not sys.stdout.isatty(): ssh_command += ["-o", "StrictHostKeyChecking=yes"] env["GIT_SSH_COMMAND"] = " ".join(ssh_command) # git clone path_repository = Path(directory) / "repository" scheme = f"{scheme}://" if scheme else "" rc, out = rc_cmd( [self.command, "clone", f"{scheme}{netloc}{url}", str(path_repository), "--depth=1"], env=env, shell=False, ) if rc: raise Exception(out) # git add filename = Path(Path(self.url.path).name).stem dst = path_repository / filename shutil.copy2(location, dst) rc, out = rc_cmd( [self.command, "-C", str(path_repository), "add", filename], env=env, shell=False, ) # git commit -m commit_message = os.environ.get("COMMIT_COMMENT", "commit") rc, out = rc_cmd( [self.command, "-C", str(path_repository), "commit", "-m", commit_message], env=env, shell=False, ) # git push rc, out = rc_cmd( [self.command, "-C", str(path_repository), "push"], env=env, shell=False, ) if rc: raise Exception(out) def urlc(urlstring, *args, **kwargs): """ Dynamically dispatch the appropriate protocol class. """ url_classes = { "http": HttpC, "https": HttpC, "ftp": FtpC, "ftps": FtpC, "sftp": SshC, "ssh": SshC, "scp": SshC, "tftp": TftpC, "git": GitC, } url = urllib.parse.urlsplit(urlstring) scheme, _, _ = url.scheme.partition("+") try: return url_classes[scheme](url, *args, **kwargs) except KeyError: raise ValueError(f'Unsupported URL scheme: "{scheme}"') def download(local_path, urlstring, progressbar=False, check_space=False, - source_host='', source_port=0, timeout=10.0): + source_host='', source_port=0, timeout=10.0, raise_error=False): try: progressbar = progressbar and is_interactive() urlc(urlstring, progressbar, check_space, source_host, source_port, timeout).download(local_path) except Exception as err: + if raise_error: + raise print_error(f'Unable to download "{urlstring}": {err}') except KeyboardInterrupt: print_error('\nDownload aborted by user.') def upload(local_path, urlstring, progressbar=False, source_host='', source_port=0, timeout=10.0): try: progressbar = progressbar and is_interactive() urlc(urlstring, progressbar, False, source_host, source_port, timeout).upload(local_path) except Exception as err: print_error(f'Unable to upload "{urlstring}": {err}') except KeyboardInterrupt: print_error('\nUpload aborted by user.') def get_remote_config(urlstring, source_host='', source_port=0): """ Quietly download a file and return it as a string. """ temp = tempfile.NamedTemporaryFile(delete=False).name try: download(temp, urlstring, False, False, source_host, source_port) with open(temp, 'r') as f: return f.read() finally: os.remove(temp) diff --git a/python/vyos/system/__init__.py b/python/vyos/system/__init__.py new file mode 100644 index 000000000..0c91330ba --- /dev/null +++ b/python/vyos/system/__init__.py @@ -0,0 +1,18 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +__all_: list[str] = ['disk', 'grub', 'image'] +# define image-tools version +SYSTEM_CFG_VER = 1 diff --git a/python/vyos/system/compat.py b/python/vyos/system/compat.py new file mode 100644 index 000000000..319c3dabf --- /dev/null +++ b/python/vyos/system/compat.py @@ -0,0 +1,316 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library. If not, see <http://www.gnu.org/licenses/>. + +from pathlib import Path +from re import compile, MULTILINE, DOTALL +from functools import wraps +from copy import deepcopy +from typing import Union + +from vyos.system import disk, grub, image, SYSTEM_CFG_VER +from vyos.template import render + +TMPL_GRUB_COMPAT: str = 'grub/grub_compat.j2' + +# define regexes and variables +REGEX_VERSION = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P<version>\S+)/[^}]*}' +REGEX_MENUENTRY = r'^menuentry "[^\n]*{\n[^}]*\s+linux /boot/(?P<version>\S+)/vmlinuz (?P<options>[^\n]+)\n[^}]*}' +REGEX_CONSOLE = r'^.*console=(?P<console_type>[^\s\d]+)(?P<console_num>[\d]+).*$' +REGEX_SANIT_CONSOLE = r'\ ?console=[^\s\d]+[\d]+(,\d+)?\ ?' +REGEX_SANIT_INIT = r'\ ?init=\S*\ ?' +REGEX_SANIT_QUIET = r'\ ?quiet\ ?' +PW_RESET_OPTION = 'init=/opt/vyatta/sbin/standalone_root_pw_reset' + + +class DowngradingImageTools(Exception): + """Raised when attempting to add an image with an earlier version + of image-tools than the current system, as indicated by the value + of SYSTEM_CFG_VER or absence thereof.""" + pass + + +def mode(): + if grub.get_cfg_ver() >= SYSTEM_CFG_VER: + return False + + return True + + +def find_versions(menu_entries: list) -> list: + """Find unique VyOS versions from menu entries + + Args: + menu_entries (list): a list with menu entries + + Returns: + list: List of installed versions + """ + versions = [] + for vyos_ver in menu_entries: + versions.append(vyos_ver.get('version')) + # remove duplicates + versions = list(set(versions)) + return versions + + +def filter_unparsed(grub_path: str) -> str: + """Find currently installed VyOS version + + Args: + grub_path (str): a path to the grub.cfg file + + Returns: + str: unparsed grub.cfg items + """ + config_text = Path(grub_path).read_text() + regex_filter = compile(REGEX_VERSION, MULTILINE | DOTALL) + filtered = regex_filter.sub('', config_text) + regex_filter = compile(grub.REGEX_GRUB_VARS, MULTILINE) + filtered = regex_filter.sub('', filtered) + regex_filter = compile(grub.REGEX_GRUB_MODULES, MULTILINE) + filtered = regex_filter.sub('', filtered) + # strip extra new lines + filtered = filtered.strip() + return filtered + + +def get_search_root(unparsed: str) -> str: + unparsed_lines = unparsed.splitlines() + search_root = next((x for x in unparsed_lines if 'search' in x), '') + return search_root + + +def sanitize_boot_opts(boot_opts: str) -> str: + """Sanitize boot options from console and init + + Args: + boot_opts (str): boot options + + Returns: + str: sanitized boot options + """ + regex_filter = compile(REGEX_SANIT_CONSOLE) + boot_opts = regex_filter.sub('', boot_opts) + regex_filter = compile(REGEX_SANIT_INIT) + boot_opts = regex_filter.sub('', boot_opts) + # legacy tools add 'quiet' on add system image; this is not desired + regex_filter = compile(REGEX_SANIT_QUIET) + boot_opts = regex_filter.sub(' ', boot_opts) + + return boot_opts + + +def parse_entry(entry: tuple) -> dict: + """Parse GRUB menuentry + + Args: + entry (tuple): tuple of (version, options) + + Returns: + dict: dictionary with parsed options + """ + # save version to dict + entry_dict = {'version': entry[0]} + # detect boot mode type + if PW_RESET_OPTION in entry[1]: + entry_dict['bootmode'] = 'pw_reset' + else: + entry_dict['bootmode'] = 'normal' + # find console type and number + regex_filter = compile(REGEX_CONSOLE) + entry_dict.update(regex_filter.match(entry[1]).groupdict()) + entry_dict['boot_opts'] = sanitize_boot_opts(entry[1]) + + return entry_dict + + +def parse_menuentries(grub_path: str) -> list: + """Parse all GRUB menuentries + + Args: + grub_path (str): a path to GRUB config file + + Returns: + list: list with menu items (each item is a dict) + """ + menuentries = [] + # read configuration file + config_text = Path(grub_path).read_text() + # parse menuentries to tuples (version, options) + regex_filter = compile(REGEX_MENUENTRY, MULTILINE) + filter_results = regex_filter.findall(config_text) + # parse each entry + for entry in filter_results: + menuentries.append(parse_entry(entry)) + + return menuentries + + +def prune_vyos_versions(root_dir: str = '') -> None: + """Delete vyos-versions files of registered images subsequently deleted + or renamed by legacy image-tools + + Args: + root_dir (str): an optional path to the root directory + """ + if not root_dir: + root_dir = disk.find_persistence() + + for version in grub.version_list(): + if not Path(f'{root_dir}/boot/{version}').is_dir(): + grub.version_del(version) + + +def update_cfg_ver(root_dir:str = '') -> int: + """Get minumum version of image-tools across all installed images + + Args: + root_dir (str): an optional path to the root directory + + Returns: + int: minimum version of image-tools + """ + if not root_dir: + root_dir = disk.find_persistence() + + prune_vyos_versions(root_dir) + + images_details = image.get_images_details() + cfg_version = min(d['tools_version'] for d in images_details) + + return cfg_version + + +def get_default(menu_entries: list, root_dir: str = '') -> Union[int, None]: + """Translate default version to menuentry index + + Args: + menu_entries (list): list of dicts of installed version boot data + root_dir (str): an optional path to the root directory + + Returns: + int: index of default version in menu_entries or None + """ + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + image_name = image.get_default_image() + + sublist = list(filter(lambda x: x.get('version') == image_name, + menu_entries)) + if sublist: + return menu_entries.index(sublist[0]) + + return None + + +def update_version_list(root_dir: str = '') -> list[dict]: + """Update list of dicts of installed version boot data + + Args: + root_dir (str): an optional path to the root directory + + Returns: + list: list of dicts of installed version boot data + """ + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + # get list of versions in menuentries + menu_entries = parse_menuentries(grub_cfg_main) + menu_versions = find_versions(menu_entries) + + # get list of versions added/removed by image-tools + current_versions = grub.version_list(root_dir) + + remove = list(set(menu_versions) - set(current_versions)) + for ver in remove: + menu_entries = list(filter(lambda x: x.get('version') != ver, + menu_entries)) + + add = list(set(current_versions) - set(menu_versions)) + for ver in add: + last = menu_entries[0].get('version') + new = deepcopy(list(filter(lambda x: x.get('version') == last, + menu_entries))) + for e in new: + boot_opts = e.get('boot_opts').replace(last, ver) + e.update({'version': ver, 'boot_opts': boot_opts}) + + menu_entries = new + menu_entries + + return menu_entries + + +def grub_cfg_fields(root_dir: str = '') -> dict: + """Gather fields for rendering grub.cfg + + Args: + root_dir (str): an optional path to the root directory + + Returns: + dict: dictionary for rendering TMPL_GRUB_COMPAT + """ + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + fields = {'default': 0, 'timeout': 5} + # 'default' and 'timeout' from legacy grub.cfg + fields |= grub.vars_read(grub_cfg_main) + + fields['tools_version'] = SYSTEM_CFG_VER + menu_entries = update_version_list(root_dir) + fields['versions'] = menu_entries + + default = get_default(menu_entries, root_dir) + if default is not None: + fields['default'] = default + + modules = grub.modules_read(grub_cfg_main) + fields['modules'] = modules + + unparsed = filter_unparsed(grub_cfg_main).splitlines() + search_root = next((x for x in unparsed if 'search' in x), '') + fields['search_root'] = search_root + + return fields + + +def render_grub_cfg(root_dir: str = '') -> None: + """Render grub.cfg for legacy compatibility""" + if not root_dir: + root_dir = disk.find_persistence() + + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + + fields = grub_cfg_fields(root_dir) + render(grub_cfg_main, TMPL_GRUB_COMPAT, fields) + + +def grub_cfg_update(func): + """Decorator to update grub.cfg after function call""" + @wraps(func) + def wrapper(*args, **kwargs): + ret = func(*args, **kwargs) + if mode(): + render_grub_cfg() + return ret + return wrapper diff --git a/python/vyos/system/disk.py b/python/vyos/system/disk.py new file mode 100644 index 000000000..b8a2c0f35 --- /dev/null +++ b/python/vyos/system/disk.py @@ -0,0 +1,229 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +from json import loads as json_loads +from os import sync +from dataclasses import dataclass + +from psutil import disk_partitions + +from vyos.utils.process import run, cmd + + +@dataclass +class DiskDetails: + """Disk details""" + name: str + partition: dict[str, str] + + +def disk_cleanup(drive_path: str) -> None: + """Clean up disk partition table (MBR and GPT) + Remove partition and device signatures. + Zeroize primary and secondary headers - first and last 17408 bytes + (512 bytes * 34 LBA) on a drive + + Args: + drive_path (str): path to a drive that needs to be cleaned + """ + partitions: list[str] = partition_list(drive_path) + for partition in partitions: + run(f'wipefs -af {partition}') + run(f'wipefs -af {drive_path}') + run(f'sgdisk -Z {drive_path}') + + +def find_persistence() -> str: + """Find a mountpoint for persistence storage + + Returns: + str: Path where 'persistance' pertition is mounted, Empty if not found + """ + mounted_partitions = disk_partitions() + for partition in mounted_partitions: + if partition.mountpoint.endswith('/persistence'): + return partition.mountpoint + return '' + + +def parttable_create(drive_path: str, root_size: int) -> None: + """Create a hybrid MBR/GPT partition table + 0-2047 first sectors are free + 2048-4095 sectors - BIOS Boot Partition + 4096 + 256 MB - EFI system partition + Everything else till the end of a drive - Linux partition + + Args: + drive_path (str): path to a drive + """ + if not root_size: + root_size_text: str = '+100%' + else: + root_size_text: str = str(root_size) + command = f'sgdisk -a1 -n1:2048:4095 -t1:EF02 -n2:4096:+256M -t2:EF00 \ + -n3:0:+{root_size_text}K -t3:8300 {drive_path}' + + run(command) + # update partitons in kernel + sync() + run(f'partprobe {drive_path}') + + partitions: list[str] = partition_list(drive_path) + + disk: DiskDetails = DiskDetails( + name = drive_path, + partition = { + 'efi': next(x for x in partitions if x.endswith('2')), + 'root': next(x for x in partitions if x.endswith('3')) + } + ) + + return disk + + +def partition_list(drive_path: str) -> list[str]: + """Get a list of partitions on a drive + + Args: + drive_path (str): path to a drive + + Returns: + list[str]: a list of partition paths + """ + lsblk: str = cmd(f'lsblk -Jp {drive_path}') + drive_info: dict = json_loads(lsblk) + device: list = drive_info.get('blockdevices') + children: list[str] = device[0].get('children', []) if device else [] + partitions: list[str] = [child.get('name') for child in children] + return partitions + + +def partition_parent(partition_path: str) -> str: + """Get a parent device for a partition + + Args: + partition (str): path to a partition + + Returns: + str: path to a parent device + """ + parent: str = cmd(f'lsblk -ndpo pkname {partition_path}') + return parent + + +def from_partition(partition_path: str) -> DiskDetails: + drive_path: str = partition_parent(partition_path) + partitions: list[str] = partition_list(drive_path) + + disk: DiskDetails = DiskDetails( + name = drive_path, + partition = { + 'efi': next(x for x in partitions if x.endswith('2')), + 'root': next(x for x in partitions if x.endswith('3')) + } + ) + + return disk + +def filesystem_create(partition: str, fstype: str) -> None: + """Create a filesystem on a partition + + Args: + partition (str): path to a partition (for example: '/dev/sda1') + fstype (str): filesystem type ('efi' or 'ext4') + """ + if fstype == 'efi': + command = 'mkfs -t fat -n EFI' + run(f'{command} {partition}') + if fstype == 'ext4': + command = 'mkfs -t ext4 -L persistence' + run(f'{command} {partition}') + + +def partition_mount(partition: str, + path: str, + fsype: str = '', + overlay_params: dict[str, str] = {}) -> bool: + """Mount a partition into a path + + Args: + partition (str): path to a partition (for example: '/dev/sda1') + path (str): a path where to mount + fsype (str): optionally, set fstype ('squashfs', 'overlay', 'iso9660') + overlay_params (dict): optionally, set overlay parameters. + Defaults to None. + + Returns: + bool: True on success + """ + if fsype in ['squashfs', 'iso9660']: + command: str = f'mount -o loop,ro -t {fsype} {partition} {path}' + if fsype == 'overlay' and overlay_params: + command: str = f'mount -t overlay -o noatime,\ + upperdir={overlay_params["upperdir"]},\ + lowerdir={overlay_params["lowerdir"]},\ + workdir={overlay_params["workdir"]} overlay {path}' + + else: + command = f'mount {partition} {path}' + + rc = run(command) + if rc == 0: + return True + + return False + + +def partition_umount(partition: str = '', path: str = '') -> None: + """Umount a partition by a partition name or a path + + Args: + partition (str): path to a partition (for example: '/dev/sda1') + path (str): a path where a partition is mounted + """ + if partition: + command = f'umount {partition}' + run(command) + if path: + command = f'umount {path}' + run(command) + + +def find_device(mountpoint: str) -> str: + """Find a device by mountpoint + + Returns: + str: Path to device, Empty if not found + """ + mounted_partitions = disk_partitions() + for partition in mounted_partitions: + if partition.mountpoint == mountpoint: + return partition.mountpoint + return '' + + +def disks_size() -> dict[str, int]: + """Get a dictionary with physical disks and their sizes + + Returns: + dict[str, int]: a dictionary with name: size mapping + """ + disks_size: dict[str, int] = {} + lsblk: str = cmd('lsblk -Jbp') + blk_list = json_loads(lsblk) + for device in blk_list.get('blockdevices'): + if device['type'] == 'disk': + disks_size.update({device['name']: device['size']}) + return disks_size diff --git a/python/vyos/system/grub.py b/python/vyos/system/grub.py new file mode 100644 index 000000000..4ebf229a0 --- /dev/null +++ b/python/vyos/system/grub.py @@ -0,0 +1,342 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +from pathlib import Path +from re import MULTILINE, compile as re_compile +from typing import Union +from uuid import uuid5, NAMESPACE_URL, UUID + +from vyos.template import render +from vyos.utils.process import cmd +from vyos.system import disk + +# Define variables +GRUB_DIR_MAIN: str = '/boot/grub' +GRUB_CFG_MAIN: str = f'{GRUB_DIR_MAIN}/grub.cfg' +GRUB_DIR_VYOS: str = f'{GRUB_DIR_MAIN}/grub.cfg.d' +CFG_VYOS_HEADER: str = f'{GRUB_DIR_VYOS}/00-vyos-header.cfg' +CFG_VYOS_MODULES: str = f'{GRUB_DIR_VYOS}/10-vyos-modules-autoload.cfg' +CFG_VYOS_VARS: str = f'{GRUB_DIR_VYOS}/20-vyos-defaults-autoload.cfg' +CFG_VYOS_COMMON: str = f'{GRUB_DIR_VYOS}/25-vyos-common-autoload.cfg' +CFG_VYOS_PLATFORM: str = f'{GRUB_DIR_VYOS}/30-vyos-platform-autoload.cfg' +CFG_VYOS_MENU: str = f'{GRUB_DIR_VYOS}/40-vyos-menu-autoload.cfg' +CFG_VYOS_OPTIONS: str = f'{GRUB_DIR_VYOS}/50-vyos-options.cfg' +GRUB_DIR_VYOS_VERS: str = f'{GRUB_DIR_VYOS}/vyos-versions' + +TMPL_VYOS_VERSION: str = 'grub/grub_vyos_version.j2' +TMPL_GRUB_VARS: str = 'grub/grub_vars.j2' +TMPL_GRUB_MAIN: str = 'grub/grub_main.j2' +TMPL_GRUB_MENU: str = 'grub/grub_menu.j2' +TMPL_GRUB_MODULES: str = 'grub/grub_modules.j2' +TMPL_GRUB_OPTS: str = 'grub/grub_options.j2' +TMPL_GRUB_COMMON: str = 'grub/grub_common.j2' + +# prepare regexes +REGEX_GRUB_VARS: str = r'^set (?P<variable_name>.+)=[\'"]?(?P<variable_value>.*)(?<![\'"])[\'"]?$' +REGEX_GRUB_MODULES: str = r'^insmod (?P<module_name>.+)$' +REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?P<boot_type>boot|live)/((?P<image_version>.+)/)?vmlinuz.*$' + + +def install(drive_path: str, boot_dir: str, efi_dir: str, id: str = 'VyOS') -> None: + """Install GRUB for both BIOS and EFI modes (hybrid boot) + + Args: + drive_path (str): path to a drive where GRUB must be installed + boot_dir (str): a path to '/boot' directory + efi_dir (str): a path to '/boot/efi' directory + """ + commands: list[str] = [ + f'grub-install --no-floppy --target=i386-pc --boot-directory={boot_dir} \ + {drive_path} --force', + f'grub-install --no-floppy --recheck --target=x86_64-efi \ + --force-extra-removable --boot-directory={boot_dir} \ + --efi-directory={efi_dir} --bootloader-id="{id}" \ + --no-uefi-secure-boot' + ] + for command in commands: + cmd(command) + + +def gen_version_uuid(version_name: str) -> str: + """Generate unique ID from version name + + Use UUID5 / NAMESPACE_URL with prefix `uuid5-` + + Args: + version_name (str): version name + + Returns: + str: generated unique ID + """ + ver_uuid: UUID = uuid5(NAMESPACE_URL, version_name) + ver_id: str = f'uuid5-{ver_uuid}' + return ver_id + + +def version_add(version_name: str, + root_dir: str = '', + boot_opts: str = '') -> None: + """Add a new VyOS version to GRUB loader configuration + + Args: + vyos_version (str): VyOS version name + root_dir (str): an optional path to the root directory. + Defaults to empty. + boot_opts (str): an optional boot options for Linux kernel. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + version_config: str = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{version_name}.cfg' + render( + version_config, TMPL_VYOS_VERSION, { + 'version_name': version_name, + 'version_uuid': gen_version_uuid(version_name), + 'boot_opts': boot_opts + }) + + +def version_del(vyos_version: str, root_dir: str = '') -> None: + """Delete a VyOS version from GRUB loader configuration + + Args: + vyos_version (str): VyOS version name + root_dir (str): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + version_config: str = f'{root_dir}/{GRUB_DIR_VYOS_VERS}/{vyos_version}.cfg' + Path(version_config).unlink(missing_ok=True) + + +def version_list(root_dir: str = '') -> list[str]: + """Generate a list with installed VyOS versions + + Args: + root_dir (str): an optional path to the root directory. + Defaults to empty. + + Returns: + list: A list with versions names + """ + if not root_dir: + root_dir = disk.find_persistence() + versions_files = Path(f'{root_dir}/{GRUB_DIR_VYOS_VERS}').glob('*.cfg') + versions_list: list[str] = [] + for file in versions_files: + versions_list.append(file.stem) + versions_list.sort() + + return versions_list + + +def read_env(env_file: str = '') -> dict[str, str]: + """Read GRUB environment + + Args: + env_file (str, optional): a path to grub environment file. + Defaults to empty. + + Returns: + dict: dictionary with GRUB environment + """ + if not env_file: + root_dir: str = disk.find_persistence() + env_file = f'{root_dir}/{GRUB_DIR_MAIN}/grubenv' + + env_content: str = cmd(f'grub-editenv {env_file} list').splitlines() + regex_filter = re_compile(r'^(?P<variable_name>.*)=(?P<variable_value>.*)$') + env_dict: dict[str, str] = {} + for env_item in env_content: + search_result = regex_filter.fullmatch(env_item) + if search_result: + search_result_dict: dict[str, str] = search_result.groupdict() + variable_name: str = search_result_dict.get('variable_name', '') + variable_value: str = search_result_dict.get('variable_value', '') + if variable_name and variable_value: + env_dict.update({variable_name: variable_value}) + return env_dict + + +def get_cfg_ver(root_dir: str = '') -> int: + """Get current version of GRUB configuration + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + + Returns: + int: a configuration version + """ + if not root_dir: + root_dir = disk.find_persistence() + + cfg_ver: str = vars_read(f'{root_dir}/{CFG_VYOS_HEADER}').get( + 'VYOS_CFG_VER') + if cfg_ver: + cfg_ver_int: int = int(cfg_ver) + else: + cfg_ver_int: int = 0 + return cfg_ver_int + + +def write_cfg_ver(cfg_ver: int, root_dir: str = '') -> None: + """Write version number of GRUB configuration + + Args: + cfg_ver (int): a version number to write + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + + Returns: + int: a configuration version + """ + if not root_dir: + root_dir = disk.find_persistence() + + vars_file: str = f'{root_dir}/{CFG_VYOS_HEADER}' + vars_current: dict[str, str] = vars_read(vars_file) + vars_current['VYOS_CFG_VER'] = str(cfg_ver) + vars_write(vars_file, vars_current) + + +def vars_read(grub_cfg: str) -> dict[str, str]: + """Read variables from a GRUB configuration file + + Args: + grub_cfg (str): a path to the GRUB config file + + Returns: + dict: a dictionary with variables and values + """ + vars_dict: dict[str, str] = {} + regex_filter = re_compile(REGEX_GRUB_VARS) + try: + config_text: list[str] = Path(grub_cfg).read_text().splitlines() + except FileNotFoundError: + return vars_dict + for line in config_text: + search_result = regex_filter.fullmatch(line) + if search_result: + search_dict = search_result.groupdict() + variable_name: str = search_dict.get('variable_name', '') + variable_value: str = search_dict.get('variable_value', '') + if variable_name and variable_value: + vars_dict.update({variable_name: variable_value}) + return vars_dict + + +def modules_read(grub_cfg: str) -> list[str]: + """Read modules list from a GRUB configuration file + + Args: + grub_cfg (str): a path to the GRUB config file + + Returns: + list: a list with modules to load + """ + mods_list: list[str] = [] + regex_filter = re_compile(REGEX_GRUB_MODULES, MULTILINE) + try: + config_text = Path(grub_cfg).read_text() + except FileNotFoundError: + return mods_list + mods_list = regex_filter.findall(config_text) + + return mods_list + + +def modules_write(grub_cfg: str, mods_list: list[str]) -> None: + """Write modules list to a GRUB configuration file (overwrite everything) + + Args: + grub_cfg (str): a path to GRUB configuration file + mods_list (list): a list with modules to load + """ + render(grub_cfg, TMPL_GRUB_MODULES, {'mods_list': mods_list}) + + +def vars_write(grub_cfg: str, grub_vars: dict[str, str]) -> None: + """Write variables to a GRUB configuration file (overwrite everything) + + Args: + grub_cfg (str): a path to GRUB configuration file + grub_vars (dict): a dictionary with new variables + """ + render(grub_cfg, TMPL_GRUB_VARS, {'vars': grub_vars}) + + +def set_default(version_name: str, root_dir: str = '') -> None: + """Set version as default boot entry + + Args: + version_name (str): versio name + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + + vars_file = f'{root_dir}/{CFG_VYOS_VARS}' + vars_current = vars_read(vars_file) + vars_current['default'] = gen_version_uuid(version_name) + vars_write(vars_file, vars_current) + + +def common_write(root_dir: str = '', grub_common: dict[str, str] = {}) -> None: + """Write common GRUB configuration file (overwrite everything) + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + common_config = f'{root_dir}/{CFG_VYOS_COMMON}' + render(common_config, TMPL_GRUB_COMMON, grub_common) + + +def create_structure(root_dir: str = '') -> None: + """Create GRUB directories structure + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to ''. + """ + if not root_dir: + root_dir = disk.find_persistence() + + Path(f'{root_dir}/GRUB_DIR_VYOS_VERS').mkdir(parents=True, exist_ok=True) + + +def set_console_type(console_type: str, root_dir: str = '') -> None: + """Write default console type to GRUB configuration + + Args: + console_type (str): a default console type + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + """ + if not root_dir: + root_dir = disk.find_persistence() + + vars_file: str = f'{root_dir}/{CFG_VYOS_VARS}' + vars_current: dict[str, str] = vars_read(vars_file) + vars_current['console_type'] = str(console_type) + vars_write(vars_file, vars_current) + +def set_raid(root_dir: str = '') -> None: + pass diff --git a/python/vyos/system/image.py b/python/vyos/system/image.py new file mode 100644 index 000000000..c03ce02d5 --- /dev/null +++ b/python/vyos/system/image.py @@ -0,0 +1,268 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library. If not, see <http://www.gnu.org/licenses/>. + +from pathlib import Path +from re import compile as re_compile +from tempfile import TemporaryDirectory +from typing import TypedDict + +from vyos import version +from vyos.system import disk, grub + +# Define variables +GRUB_DIR_MAIN: str = '/boot/grub' +GRUB_DIR_VYOS: str = f'{GRUB_DIR_MAIN}/grub.cfg.d' +CFG_VYOS_VARS: str = f'{GRUB_DIR_VYOS}/20-vyos-defaults-autoload.cfg' +GRUB_DIR_VYOS_VERS: str = f'{GRUB_DIR_VYOS}/vyos-versions' +# prepare regexes +REGEX_KERNEL_CMDLINE: str = r'^BOOT_IMAGE=/(?P<boot_type>boot|live)/((?P<image_version>.+)/)?vmlinuz.*$' +REGEX_SYSTEM_CFG_VER: str = r'(\r\n|\r|\n)SYSTEM_CFG_VER\s*=\s*(?P<cfg_ver>\d+)(\r\n|\r|\n)' + + +# structures definitions +class ImageDetails(TypedDict): + name: str + version: str + tools_version: int + disk_ro: int + disk_rw: int + disk_total: int + + +class BootDetails(TypedDict): + image_default: str + image_running: str + images_available: list[str] + console_type: str + console_num: int + + +def bootmode_detect() -> str: + """Detect system boot mode + + Returns: + str: 'bios' or 'efi' + """ + if Path('/sys/firmware/efi/').exists(): + return 'efi' + else: + return 'bios' + + +def get_image_version(mount_path: str) -> str: + """Extract version name from rootfs mounted at mount_path + + Args: + mount_path (str): mount path of rootfs + + Returns: + str: version name + """ + version_file: str = Path( + f'{mount_path}/opt/vyatta/etc/version').read_text() + version_name: str = version_file.lstrip('Version: ').strip() + + return version_name + + +def get_image_tools_version(mount_path: str) -> int: + """Extract image-tools version from rootfs mounted at mount_path + + Args: + mount_path (str): mount path of rootfs + + Returns: + str: image-tools version + """ + try: + version_file: str = Path( + f'{mount_path}/usr/lib/python3/dist-packages/vyos/system/__init__.py').read_text() + except FileNotFoundError: + system_cfg_ver: int = 0 + else: + res = re_compile(REGEX_SYSTEM_CFG_VER).search(version_file) + system_cfg_ver: int = int(res.groupdict().get('cfg_ver', 0)) + + return system_cfg_ver + + +def get_versions(image_name: str, root_dir: str = '') -> dict[str, str]: + """Return versions of image and image-tools + + Args: + image_name (str): a name of an image + root_dir (str, optional): an optional path to the root directory. + Defaults to ''. + + Returns: + dict[str, int]: a dictionary with versions of image and image-tools + """ + if not root_dir: + root_dir = disk.find_persistence() + + squashfs_file: str = next( + Path(f'{root_dir}/boot/{image_name}').glob('*.squashfs')).as_posix() + with TemporaryDirectory() as squashfs_mounted: + disk.partition_mount(squashfs_file, squashfs_mounted, 'squashfs') + + image_version: str = get_image_version(squashfs_mounted) + image_tools_version: int = get_image_tools_version(squashfs_mounted) + + disk.partition_umount(squashfs_file) + + versions: dict[str, int] = { + 'image': image_version, + 'image-tools': image_tools_version + } + + return versions + + +def get_details(image_name: str, root_dir: str = '') -> ImageDetails: + """Return information about image + + Args: + image_name (str): a name of an image + root_dir (str, optional): an optional path to the root directory. + Defaults to ''. + + Returns: + ImageDetails: a dictionary with details about an image (name, size) + """ + if not root_dir: + root_dir = disk.find_persistence() + + versions = get_versions(image_name, root_dir) + image_version: str = versions.get('image', '') + image_tools_version: int = versions.get('image-tools', 0) + + image_path: Path = Path(f'{root_dir}/boot/{image_name}') + image_path_rw: Path = Path(f'{root_dir}/boot/{image_name}/rw') + + image_disk_ro: int = int() + for item in image_path.iterdir(): + if not item.is_symlink(): + image_disk_ro += item.stat().st_size + + image_disk_rw: int = int() + for item in image_path_rw.rglob('*'): + if not item.is_symlink(): + image_disk_rw += item.stat().st_size + + image_details: ImageDetails = { + 'name': image_name, + 'version': image_version, + 'tools_version': image_tools_version, + 'disk_ro': image_disk_ro, + 'disk_rw': image_disk_rw, + 'disk_total': image_disk_ro + image_disk_rw + } + + return image_details + + +def get_images_details() -> list[ImageDetails]: + """Return information about all images + + Returns: + list[ImageDetails]: a list of dictionaries with details about images + """ + images: list[str] = grub.version_list() + images_details: list[ImageDetails] = list() + for image_name in images: + images_details.append(get_details(image_name)) + + return images_details + + +def get_running_image() -> str: + """Find currently running image name + + Returns: + str: image name + """ + running_image: str = '' + regex_filter = re_compile(REGEX_KERNEL_CMDLINE) + cmdline: str = Path('/proc/cmdline').read_text() + running_image_result = regex_filter.match(cmdline) + if running_image_result: + running_image: str = running_image_result.groupdict().get( + 'image_version', '') + # we need to have a fallback for live systems + if not running_image: + running_image: str = version.get_version() + + return running_image + + +def get_default_image(root_dir: str = '') -> str: + """Get default boot entry + + Args: + root_dir (str, optional): an optional path to the root directory. + Defaults to empty. + Returns: + str: a version name + """ + if not root_dir: + root_dir = disk.find_persistence() + + vars_file: str = f'{root_dir}/{CFG_VYOS_VARS}' + vars_current: dict[str, str] = grub.vars_read(vars_file) + default_uuid: str = vars_current.get('default', '') + if default_uuid: + images_list: list[str] = grub.version_list(root_dir) + for image_name in images_list: + if default_uuid == grub.gen_version_uuid(image_name): + return image_name + return '' + else: + return '' + + +def validate_name(image_name: str) -> bool: + """Validate image name + + Args: + image_name (str): suggested image name + + Returns: + bool: validation result + """ + regex_filter = re_compile(r'^[\w\.+-]{1,32}$') + if regex_filter.match(image_name): + return True + return False + + +def is_live_boot() -> bool: + """Detect live booted system + + Returns: + bool: True if the system currently booted in live mode + """ + regex_filter = re_compile(REGEX_KERNEL_CMDLINE) + cmdline: str = Path('/proc/cmdline').read_text() + running_image_result = regex_filter.match(cmdline) + if running_image_result: + boot_type: str = running_image_result.groupdict().get('boot_type', '') + if boot_type == 'live': + return True + return False + +def is_running_as_container() -> bool: + if Path('/.dockerenv').exists(): + return True + return False diff --git a/python/vyos/system/raid.py b/python/vyos/system/raid.py new file mode 100644 index 000000000..5b33d34da --- /dev/null +++ b/python/vyos/system/raid.py @@ -0,0 +1,122 @@ +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library. If not, see <http://www.gnu.org/licenses/>. + +"""RAID related functions""" + +from pathlib import Path +from shutil import copy +from dataclasses import dataclass + +from vyos.utils.process import cmd, run +from vyos.system import disk + + +@dataclass +class RaidDetails: + """RAID type""" + name: str + level: str + members: list[str] + disks: list[disk.DiskDetails] + + +def raid_create(raid_members: list[str], + raid_name: str = 'md0', + raid_level: str = 'raid1') -> None: + """Create a RAID array + + Args: + raid_name (str): a name of array (data, backup, test, etc.) + raid_members (list[str]): a list of array members + raid_level (str, optional): an array level. Defaults to 'raid1'. + """ + raid_devices_num: int = len(raid_members) + raid_members_str: str = ' '.join(raid_members) + for part in raid_members: + drive: str = disk.partition_parent(part) + # set partition type GUID for raid member; cf. + # https://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_type_GUIDs + command: str = f'sgdisk --typecode=3:A19D880F-05FC-4D3B-A006-743F0F84911E {drive}' + cmd(command) + command: str = f'mdadm --create /dev/{raid_name} -R --metadata=1.0 \ + --raid-devices={raid_devices_num} --level={raid_level} \ + {raid_members_str}' + + cmd(command) + + raid = RaidDetails( + name = f'/dev/{raid_name}', + level = raid_level, + members = raid_members, + disks = [disk.from_partition(m) for m in raid_members] + ) + + return raid + +def clear(): + """Deactivate all RAID arrays""" + command: str = 'mdadm --examine --scan' + raid_config = cmd(command) + if not raid_config: + return + command: str = 'mdadm --run /dev/md?*' + run(command) + command: str = 'mdadm --assemble --scan --auto=yes --symlink=no' + run(command) + command: str = 'mdadm --stop --scan' + run(command) + + +def update_initramfs() -> None: + """Update initramfs""" + mdadm_script = '/etc/initramfs-tools/scripts/local-top/mdadm' + copy('/usr/share/initramfs-tools/scripts/local-block/mdadm', mdadm_script) + p = Path(mdadm_script) + p.write_text(p.read_text().replace('$((COUNT + 1))', '20')) + command: str = 'update-initramfs -u' + cmd(command) + +def update_default(target_dir: str) -> None: + """Update /etc/default/mdadm to start MD monitoring daemon at boot + """ + source_mdadm_config = '/etc/default/mdadm' + target_mdadm_config = Path(target_dir).joinpath('/etc/default/mdadm') + target_mdadm_config_dir = Path(target_mdadm_config).parent + Path.mkdir(target_mdadm_config_dir, parents=True, exist_ok=True) + s = Path(source_mdadm_config).read_text().replace('START_DAEMON=false', + 'START_DAEMON=true') + Path(target_mdadm_config).write_text(s) + +def get_uuid(device: str) -> str: + """Get UUID of a device""" + command: str = f'tune2fs -l {device}' + l = cmd(command).splitlines() + uuid = next((x for x in l if x.startswith('Filesystem UUID')), '') + return uuid.split(':')[1].strip() if uuid else '' + +def get_uuids(raid_details: RaidDetails) -> tuple[str]: + """Get UUIDs of RAID members + + Args: + raid_name (str): a name of array (data, backup, test, etc.) + + Returns: + tuple[str]: root_disk uuid, root_md uuid + """ + raid_name: str = raid_details.name + root_partition: str = raid_details.members[0] + uuid_root_disk: str = get_uuid(root_partition) + uuid_root_md: str = get_uuid(raid_name) + return uuid_root_disk, uuid_root_md diff --git a/python/vyos/utils/convert.py b/python/vyos/utils/convert.py index 9a8a1ff7d..c02f0071e 100644 --- a/python/vyos/utils/convert.py +++ b/python/vyos/utils/convert.py @@ -1,197 +1,200 @@ # Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. def seconds_to_human(s, separator=""): """ Converts number of seconds passed to a human-readable interval such as 1w4d18h35m59s """ s = int(s) week = 60 * 60 * 24 * 7 day = 60 * 60 * 24 hour = 60 * 60 remainder = 0 result = "" weeks = s // week if weeks > 0: result = "{0}w".format(weeks) s = s % week days = s // day if days > 0: result = "{0}{1}{2}d".format(result, separator, days) s = s % day hours = s // hour if hours > 0: result = "{0}{1}{2}h".format(result, separator, hours) s = s % hour minutes = s // 60 if minutes > 0: result = "{0}{1}{2}m".format(result, separator, minutes) s = s % 60 seconds = s if seconds > 0: result = "{0}{1}{2}s".format(result, separator, seconds) return result -def bytes_to_human(bytes, initial_exponent=0, precision=2): +def bytes_to_human(bytes, initial_exponent=0, precision=2, + int_below_exponent=0): """ Converts a value in bytes to a human-readable size string like 640 KB The initial_exponent parameter is the exponent of 2, e.g. 10 (1024) for kilobytes, 20 (1024 * 1024) for megabytes. """ if bytes == 0: return "0 B" from math import log2 bytes = bytes * (2**initial_exponent) # log2 is a float, while range checking requires an int exponent = int(log2(bytes)) + if exponent < int_below_exponent: + precision = 0 if exponent < 10: value = bytes suffix = "B" elif exponent in range(10, 20): value = bytes / 1024 suffix = "KB" elif exponent in range(20, 30): value = bytes / 1024**2 suffix = "MB" elif exponent in range(30, 40): value = bytes / 1024**3 suffix = "GB" else: value = bytes / 1024**4 suffix = "TB" # Add a new case when the first machine with petabyte RAM # hits the market. size_string = "{0:.{1}f} {2}".format(value, precision, suffix) return size_string def human_to_bytes(value): """ Converts a data amount with a unit suffix to bytes, like 2K to 2048 """ from re import match as re_match res = re_match(r'^\s*(\d+(?:\.\d+)?)\s*([a-zA-Z]+)\s*$', value) if not res: raise ValueError(f"'{value}' is not a valid data amount") else: amount = float(res.group(1)) unit = res.group(2).lower() if unit == 'b': res = amount elif (unit == 'k') or (unit == 'kb'): res = amount * 1024 elif (unit == 'm') or (unit == 'mb'): res = amount * 1024**2 elif (unit == 'g') or (unit == 'gb'): res = amount * 1024**3 elif (unit == 't') or (unit == 'tb'): res = amount * 1024**4 else: raise ValueError(f"Unsupported data unit '{unit}'") # There cannot be fractional bytes, so we convert them to integer. # However, truncating causes problems with conversion back to human unit, # so we round instead -- that seems to work well enough. return round(res) def mac_to_eui64(mac, prefix=None): """ Convert a MAC address to a EUI64 address or, with prefix provided, a full IPv6 address. Thankfully copied from https://gist.github.com/wido/f5e32576bb57b5cc6f934e177a37a0d3 """ import re from ipaddress import ip_network # http://tools.ietf.org/html/rfc4291#section-2.5.1 eui64 = re.sub(r'[.:-]', '', mac).lower() eui64 = eui64[0:6] + 'fffe' + eui64[6:] eui64 = hex(int(eui64[0:2], 16) ^ 2)[2:].zfill(2) + eui64[2:] if prefix is None: return ':'.join(re.findall(r'.{4}', eui64)) else: try: net = ip_network(prefix, strict=False) euil = int('0x{0}'.format(eui64), 16) return str(net[euil]) except: # pylint: disable=bare-except return def convert_data(data) -> dict | list | tuple | str | int | float | bool | None: """Filter and convert multiple types of data to types usable in CLI/API WARNING: Must not be used for anything except formatting output for API or CLI On the output allowed everything supported in JSON. Args: data (Any): input data Returns: dict | list | tuple | str | int | float | bool | None: converted data """ from base64 import b64encode # return original data for types which do not require conversion if isinstance(data, str | int | float | bool | None): return data if isinstance(data, list): list_tmp = [] for item in data: list_tmp.append(convert_data(item)) return list_tmp if isinstance(data, tuple): list_tmp = list(data) tuple_tmp = tuple(convert_data(list_tmp)) return tuple_tmp if isinstance(data, bytes | bytearray): try: return data.decode() except UnicodeDecodeError: return b64encode(data).decode() if isinstance(data, set | frozenset): list_tmp = convert_data(list(data)) return list_tmp if isinstance(data, dict): dict_tmp = {} for key, value in data.items(): dict_tmp[key] = convert_data(value) return dict_tmp # do not return anything for other types # which cannot be converted to JSON # for example: complex | range | memoryview return diff --git a/python/vyos/utils/file.py b/python/vyos/utils/file.py index 667a2464b..9f27a7fb9 100644 --- a/python/vyos/utils/file.py +++ b/python/vyos/utils/file.py @@ -1,183 +1,189 @@ # Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. import os from vyos.utils.permission import chown def makedir(path, user=None, group=None): if os.path.exists(path): return os.makedirs(path, mode=0o755) chown(path, user, group) def file_is_persistent(path): import re location = r'^(/config|/opt/vyatta/etc/config)' absolute = os.path.abspath(os.path.dirname(path)) return re.match(location,absolute) def read_file(fname, defaultonfailure=None): """ read the content of a file, stripping any end characters (space, newlines) should defaultonfailure be not None, it is returned on failure to read """ try: """ Read a file to string """ with open(fname, 'r') as f: data = f.read().strip() return data except Exception as e: if defaultonfailure is not None: return defaultonfailure raise e def write_file(fname, data, defaultonfailure=None, user=None, group=None, mode=None, append=False): """ Write content of data to given fname, should defaultonfailure be not None, it is returned on failure to read. If directory of file is not present, it is auto-created. """ dirname = os.path.dirname(fname) if not os.path.isdir(dirname): os.makedirs(dirname, mode=0o755, exist_ok=False) chown(dirname, user, group) try: """ Write a file to string """ bytes = 0 with open(fname, 'w' if not append else 'a') as f: bytes = f.write(data) chown(fname, user, group) chmod(fname, mode) return bytes except Exception as e: if defaultonfailure is not None: return defaultonfailure raise e def read_json(fname, defaultonfailure=None): """ read and json decode the content of a file should defaultonfailure be not None, it is returned on failure to read """ import json try: with open(fname, 'r') as f: data = json.load(f) return data except Exception as e: if defaultonfailure is not None: return defaultonfailure raise e def chown(path, user, group): """ change file/directory owner """ from pwd import getpwnam from grp import getgrnam if user is None or group is None: return False # path may also be an open file descriptor if not isinstance(path, int) and not os.path.exists(path): return False uid = getpwnam(user).pw_uid gid = getgrnam(group).gr_gid os.chown(path, uid, gid) return True def chmod(path, bitmask): # path may also be an open file descriptor if not isinstance(path, int) and not os.path.exists(path): return if bitmask is None: return os.chmod(path, bitmask) def chmod_600(path): """ Make file only read/writable by owner """ from stat import S_IRUSR, S_IWUSR bitmask = S_IRUSR | S_IWUSR chmod(path, bitmask) def chmod_750(path): """ Make file/directory only executable to user and group """ from stat import S_IRUSR, S_IWUSR, S_IXUSR, S_IRGRP, S_IXGRP bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP chmod(path, bitmask) def chmod_755(path): """ Make file executable by all """ from stat import S_IRUSR, S_IWUSR, S_IXUSR, S_IRGRP, S_IXGRP, S_IROTH, S_IXOTH bitmask = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IXGRP | \ S_IROTH | S_IXOTH chmod(path, bitmask) +def chmod_2775(path): + """ user/group permissions with set-group-id bit set """ + from stat import S_ISGID, S_IRWXU, S_IRWXG, S_IROTH, S_IXOTH + + bitmask = S_ISGID | S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH + chmod(path, bitmask) def makedir(path, user=None, group=None): if os.path.exists(path): return os.makedirs(path, mode=0o755) chown(path, user, group) def wait_for_inotify(file_path, pre_hook=None, event_type=None, timeout=None, sleep_interval=0.1): """ Waits for an inotify event to occur """ if not os.path.dirname(file_path): raise ValueError( "File path {} does not have a directory part (required for inotify watching)".format(file_path)) if not os.path.basename(file_path): raise ValueError( "File path {} does not have a file part, do not know what to watch for".format(file_path)) from inotify.adapters import Inotify from time import time from time import sleep time_start = time() i = Inotify() i.add_watch(os.path.dirname(file_path)) if pre_hook: pre_hook() for event in i.event_gen(yield_nones=True): if (timeout is not None) and ((time() - time_start) > timeout): # If the function didn't return until this point, # the file failed to have been written to and closed within the timeout raise OSError("Waiting for file {} to be written has failed".format(file_path)) # Most such events don't take much time, so it's better to check right away # and sleep later. if event is not None: (_, type_names, path, filename) = event if filename == os.path.basename(file_path): if event_type in type_names: return sleep(sleep_interval) def wait_for_file_write_complete(file_path, pre_hook=None, timeout=None, sleep_interval=0.1): """ Waits for a process to close a file after opening it in write mode. """ wait_for_inotify(file_path, event_type='IN_CLOSE_WRITE', pre_hook=pre_hook, timeout=timeout, sleep_interval=sleep_interval) diff --git a/python/vyos/utils/io.py b/python/vyos/utils/io.py index 8790cbaac..0afaf695c 100644 --- a/python/vyos/utils/io.py +++ b/python/vyos/utils/io.py @@ -1,74 +1,104 @@ # Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. +from typing import Callable + def print_error(str='', end='\n'): """ Print `str` to stderr, terminated with `end`. Used for warnings and out-of-band messages to avoid mangling precious stdout output. """ import sys sys.stderr.write(str) sys.stderr.write(end) sys.stderr.flush() -def ask_input(question, default='', numeric_only=False, valid_responses=[]): +def ask_input(question, default='', numeric_only=False, valid_responses=[], + no_echo=False): + from getpass import getpass question_out = question if default: question_out += f' (Default: {default})' response = '' while True: - response = input(question_out + ' ').strip() + if not no_echo: + response = input(question_out + ' ').strip() + else: + response = getpass(question_out + ' ').strip() if not response and default: return default if numeric_only: if not response.isnumeric(): print("Invalid value, try again.") continue response = int(response) if valid_responses and response not in valid_responses: print("Invalid value, try again.") continue break return response def ask_yes_no(question, default=False) -> bool: """Ask a yes/no question via input() and return their answer.""" from sys import stdout default_msg = "[Y/n]" if default else "[y/N]" while True: try: stdout.write("%s %s " % (question, default_msg)) c = input().lower() if c == '': return default elif c in ("y", "ye", "yes"): return True elif c in ("n", "no"): return False else: stdout.write("Please respond with yes/y or no/n\n") except EOFError: stdout.write("\nPlease respond with yes/y or no/n\n") def is_interactive(): """Try to determine if the routine was called from an interactive shell.""" import os, sys return os.getenv('TERM', default=False) and sys.stderr.isatty() and sys.stdout.isatty() def is_dumb_terminal(): """Check if the current TTY is dumb, so that we can disable advanced terminal features.""" import os return os.getenv('TERM') in ['vt100', 'dumb'] + +def select_entry(l: list, list_msg: str = '', prompt_msg: str = '', + list_format: Callable = None,) -> str: + """Select an entry from a list + + Args: + l (list): a list of entries + list_msg (str): a message to print before listing the entries + prompt_msg (str): a message to print as prompt for selection + + Returns: + str: a selected entry + """ + en = list(enumerate(l, 1)) + print(list_msg) + for i, e in en: + if list_format: + print(f'\t{i}: {list_format(e)}') + else: + print(f'\t{i}: {e}') + select = ask_input(prompt_msg, numeric_only=True, + valid_responses=range(1, len(l)+1)) + return next(filter(lambda x: x[0] == select, en))[1] diff --git a/src/helpers/simple-download.py b/src/helpers/simple-download.py new file mode 100755 index 000000000..501af75f5 --- /dev/null +++ b/src/helpers/simple-download.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python3 + +import sys +from argparse import ArgumentParser +from vyos.remote import download + +parser = ArgumentParser() +parser.add_argument('--local-file', help='local file', required=True) +parser.add_argument('--remote-path', help='remote path', required=True) + +args = parser.parse_args() + +try: + download(args.local_file, args.remote_path, + check_space=True, raise_error=True) +except Exception as e: + print(e) + sys.exit(1) + +sys.exit() diff --git a/src/op_mode/image_info.py b/src/op_mode/image_info.py new file mode 100755 index 000000000..791001e00 --- /dev/null +++ b/src/op_mode/image_info.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python3 +# +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This file is part of VyOS. +# +# VyOS is free software: you can redistribute it and/or modify it under the +# terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# VyOS is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# VyOS. If not, see <https://www.gnu.org/licenses/>. + +import sys +from typing import List, Union + +from tabulate import tabulate + +from vyos import opmode +from vyos.system import disk, grub, image +from vyos.utils.convert import bytes_to_human + + +def _format_show_images_summary(images_summary: image.BootDetails) -> str: + headers: list[str] = ['Name', 'Default boot', 'Running'] + table_data: list[list[str]] = list() + for image_item in images_summary.get('images_available', []): + name: str = image_item + if images_summary.get('image_default') == name: + default: str = 'Yes' + else: + default: str = '' + + if images_summary.get('image_running') == name: + running: str = 'Yes' + else: + running: str = '' + + table_data.append([name, default, running]) + tabulated: str = tabulate(table_data, headers) + + return tabulated + + +def _format_show_images_details( + images_details: list[image.ImageDetails]) -> str: + headers: list[str] = [ + 'Name', 'Version', 'Storage Read-Only', 'Storage Read-Write', + 'Storage Total' + ] + table_data: list[list[Union[str, int]]] = list() + for image_item in images_details: + name: str = image_item.get('name') + version: str = image_item.get('version') + disk_ro: str = bytes_to_human(image_item.get('disk_ro'), + precision=1, int_below_exponent=30) + disk_rw: str = bytes_to_human(image_item.get('disk_rw'), + precision=1, int_below_exponent=30) + disk_total: str = bytes_to_human(image_item.get('disk_total'), + precision=1, int_below_exponent=30) + table_data.append([name, version, disk_ro, disk_rw, disk_total]) + tabulated: str = tabulate(table_data, headers, + colalign=('left', 'left', 'right', 'right', 'right')) + + return tabulated + + +def show_images_summary(raw: bool) -> Union[image.BootDetails, str]: + images_available: list[str] = grub.version_list() + root_dir: str = disk.find_persistence() + boot_vars: dict = grub.vars_read(f'{root_dir}/{image.CFG_VYOS_VARS}') + + images_summary: image.BootDetails = dict() + + images_summary['image_default'] = image.get_default_image() + images_summary['image_running'] = image.get_running_image() + images_summary['images_available'] = images_available + images_summary['console_type'] = boot_vars.get('console_type') + images_summary['console_num'] = boot_vars.get('console_num') + + if raw: + return images_summary + else: + return _format_show_images_summary(images_summary) + + +def show_images_details(raw: bool) -> Union[list[image.ImageDetails], str]: + images_details = image.get_images_details() + + if raw: + return images_details + else: + return _format_show_images_details(images_details) + + +if __name__ == '__main__': + try: + res = opmode.run(sys.modules[__name__]) + if res: + print(res) + except (ValueError, opmode.Error) as e: + print(e) + sys.exit(1) diff --git a/src/op_mode/image_installer.py b/src/op_mode/image_installer.py new file mode 100755 index 000000000..6a8797aec --- /dev/null +++ b/src/op_mode/image_installer.py @@ -0,0 +1,904 @@ +#!/usr/bin/env python3 +# +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This file is part of VyOS. +# +# VyOS is free software: you can redistribute it and/or modify it under the +# terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# VyOS is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# VyOS. If not, see <https://www.gnu.org/licenses/>. + +from argparse import ArgumentParser, Namespace +from pathlib import Path +from shutil import copy, chown, rmtree, copytree +from glob import glob +from sys import exit +from os import environ +from time import sleep +from typing import Union +from urllib.parse import urlparse +from passlib.hosts import linux_context + +from psutil import disk_partitions + +from vyos.configtree import ConfigTree +from vyos.remote import download +from vyos.system import disk, grub, image, compat, raid, SYSTEM_CFG_VER +from vyos.template import render +from vyos.utils.io import ask_input, ask_yes_no, select_entry +from vyos.utils.file import chmod_2775 +from vyos.utils.process import cmd, run + +# define text messages +MSG_ERR_NOT_LIVE: str = 'The system is already installed. Please use "add system image" instead.' +MSG_ERR_LIVE: str = 'The system is in live-boot mode. Please use "install image" instead.' +MSG_ERR_NO_DISK: str = 'No suitable disk was found. There must be at least one disk of 2GB or greater size.' +MSG_ERR_IMPROPER_IMAGE: str = 'Missing sha256sum.txt.\nEither this image is corrupted, or of era 1.2.x (md5sum) and would downgrade image tools;\ndisallowed in either case.' +MSG_INFO_INSTALL_WELCOME: str = 'Welcome to VyOS installation!\nThis command will install VyOS to your permanent storage.' +MSG_INFO_INSTALL_EXIT: str = 'Exiting from VyOS installation' +MSG_INFO_INSTALL_SUCCESS: str = 'The image installed successfully; please reboot now.' +MSG_INFO_INSTALL_DISKS_LIST: str = 'The following disks were found:' +MSG_INFO_INSTALL_DISK_SELECT: str = 'Which one should be used for installation?' +MSG_INFO_INSTALL_RAID_CONFIGURE: str = 'Would you like to configure RAID-1 mirroring?' +MSG_INFO_INSTALL_RAID_FOUND_DISKS: str = 'Would you like to configure RAID-1 mirroring on them?' +MSG_INFO_INSTALL_RAID_CHOOSE_DISKS: str = 'Would you like to choose two disks for RAID-1 mirroring?' +MSG_INFO_INSTALL_DISK_CONFIRM: str = 'Installation will delete all data on the drive. Continue?' +MSG_INFO_INSTALL_RAID_CONFIRM: str = 'Installation will delete all data on both drives. Continue?' +MSG_INFO_INSTALL_PARTITONING: str = 'Creating partition table...' +MSG_INPUT_CONFIG_FOUND: str = 'An active configuration was found. Would you like to copy it to the new image?' +MSG_INPUT_IMAGE_NAME: str = 'What would you like to name this image?' +MSG_INPUT_IMAGE_DEFAULT: str = 'Would you like to set the new image as the default one for boot?' +MSG_INPUT_PASSWORD: str = 'Please enter a password for the "vyos" user' +MSG_INPUT_ROOT_SIZE_ALL: str = 'Would you like to use all the free space on the drive?' +MSG_INPUT_ROOT_SIZE_SET: str = 'Please specify the size (in GB) of the root partition (min is 1.5 GB)?' +MSG_INPUT_CONSOLE_TYPE: str = 'What console should be used by default? (K: KVM, S: Serial, U: USB-Serial)?' +MSG_INPUT_COPY_DATA: str = 'Would you like to copy data to the new image?' +MSG_INPUT_CHOOSE_COPY_DATA: str = 'From which image would you like to save config information?' +MSG_WARN_ISO_SIGN_INVALID: str = 'Signature is not valid. Do you want to continue with installation?' +MSG_WARN_ISO_SIGN_UNAVAL: str = 'Signature is not available. Do you want to continue with installation?' +MSG_WARN_ROOT_SIZE_TOOBIG: str = 'The size is too big. Try again.' +MSG_WARN_ROOT_SIZE_TOOSMALL: str = 'The size is too small. Try again' +MSG_WARN_IMAGE_NAME_WRONG: str = 'The suggested name is unsupported!\n' +'It must be between 1 and 32 characters long and contains only the next characters: .+-_ a-z A-Z 0-9' +CONST_MIN_DISK_SIZE: int = 2147483648 # 2 GB +CONST_MIN_ROOT_SIZE: int = 1610612736 # 1.5 GB +# a reserved space: 2MB for header, 1 MB for BIOS partition, 256 MB for EFI +CONST_RESERVED_SPACE: int = (2 + 1 + 256) * 1024**2 + +# define directories and paths +DIR_INSTALLATION: str = '/mnt/installation' +DIR_ROOTFS_SRC: str = f'{DIR_INSTALLATION}/root_src' +DIR_ROOTFS_DST: str = f'{DIR_INSTALLATION}/root_dst' +DIR_ISO_MOUNT: str = f'{DIR_INSTALLATION}/iso_src' +DIR_DST_ROOT: str = f'{DIR_INSTALLATION}/disk_dst' +DIR_KERNEL_SRC: str = '/boot/' +FILE_ROOTFS_SRC: str = '/usr/lib/live/mount/medium/live/filesystem.squashfs' +ISO_DOWNLOAD_PATH: str = '/tmp/vyos_installation.iso' + +external_download_script = '/usr/libexec/vyos/simple-download.py' + +# default boot variables +DEFAULT_BOOT_VARS: dict[str, str] = { + 'timeout': '5', + 'console_type': 'tty', + 'console_num': '0', + 'bootmode': 'normal' +} + + +def bytes_to_gb(size: int) -> float: + """Convert Bytes to GBytes, rounded to 1 decimal number + + Args: + size (int): input size in bytes + + Returns: + float: size in GB + """ + return round(size / 1024**3, 1) + + +def gb_to_bytes(size: float) -> int: + """Convert GBytes to Bytes + + Args: + size (float): input size in GBytes + + Returns: + int: size in bytes + """ + return int(size * 1024**3) + + +def find_disks() -> dict[str, int]: + """Find a target disk for installation + + Returns: + dict[str, int]: a list of available disks by name and size + """ + # check for available disks + print('Probing disks') + disks_available: dict[str, int] = disk.disks_size() + for disk_name, disk_size in disks_available.copy().items(): + if disk_size < CONST_MIN_DISK_SIZE: + del disks_available[disk_name] + if not disks_available: + print(MSG_ERR_NO_DISK) + exit(MSG_INFO_INSTALL_EXIT) + + num_disks: int = len(disks_available) + print(f'{num_disks} disk(s) found') + + return disks_available + + +def ask_root_size(available_space: int) -> int: + """Define a size of root partition + + Args: + available_space (int): available space in bytes for a root partition + + Returns: + int: defined size + """ + if ask_yes_no(MSG_INPUT_ROOT_SIZE_ALL, default=True): + return available_space + + while True: + root_size_gb: str = ask_input(MSG_INPUT_ROOT_SIZE_SET) + root_size_kbytes: int = (gb_to_bytes(float(root_size_gb))) // 1024 + + if root_size_kbytes > available_space: + print(MSG_WARN_ROOT_SIZE_TOOBIG) + continue + if root_size_kbytes < CONST_MIN_ROOT_SIZE / 1024: + print(MSG_WARN_ROOT_SIZE_TOOSMALL) + continue + + return root_size_kbytes + +def create_partitions(target_disk: str, target_size: int, + prompt: bool = True) -> None: + """Create partitions on a target disk + + Args: + target_disk (str): a target disk + target_size (int): size of disk in bytes + """ + # define target rootfs size in KB (smallest unit acceptable by sgdisk) + available_size: int = (target_size - CONST_RESERVED_SPACE) // 1024 + if prompt: + rootfs_size: int = ask_root_size(available_size) + else: + rootfs_size: int = available_size + + print(MSG_INFO_INSTALL_PARTITONING) + raid.clear() + disk.disk_cleanup(target_disk) + disk_details: disk.DiskDetails = disk.parttable_create(target_disk, + rootfs_size) + + return disk_details + + +def search_format_selection(image: tuple[str, str]) -> str: + """Format a string for selection of image + + Args: + image (tuple[str, str]): a tuple of image name and drive + + Returns: + str: formatted string + """ + return f'{image[0]} on {image[1]}' + + +def search_previous_installation(disks: list[str]) -> None: + """Search disks for previous installation config and SSH keys + + Args: + disks (list[str]): a list of available disks + """ + mnt_config = '/mnt/config' + mnt_ssh = '/mnt/ssh' + mnt_tmp = '/mnt/tmp' + rmtree(Path(mnt_config), ignore_errors=True) + rmtree(Path(mnt_ssh), ignore_errors=True) + Path(mnt_tmp).mkdir(exist_ok=True) + + print('Searching for data from previous installations') + image_data = [] + for disk_name in disks: + for partition in disk.partition_list(disk_name): + if disk.partition_mount(partition, mnt_tmp): + if Path(mnt_tmp + '/boot').exists(): + for path in Path(mnt_tmp + '/boot').iterdir(): + if path.joinpath('rw/config/.vyatta_config').exists(): + image_data.append((path.name, partition)) + + disk.partition_umount(partition) + + if len(image_data) == 1: + image_name, image_drive = image_data[0] + print('Found data from previous installation:') + print(f'\t{image_name} on {image_drive}') + if not ask_yes_no(MSG_INPUT_COPY_DATA, default=True): + return + + elif len(image_data) > 1: + print('Found data from previous installations') + if not ask_yes_no(MSG_INPUT_COPY_DATA, default=True): + return + + image_name, image_drive = select_entry(image_data, + 'Available versions:', + MSG_INPUT_CHOOSE_COPY_DATA, + search_format_selection) + else: + print('No previous installation found') + return + + disk.partition_mount(image_drive, mnt_tmp) + + copytree(f'{mnt_tmp}/boot/{image_name}/rw/config', mnt_config) + Path(mnt_ssh).mkdir() + host_keys: list[str] = glob(f'{mnt_tmp}/boot/{image_name}/rw/etc/ssh/ssh_host*') + for host_key in host_keys: + copy(host_key, mnt_ssh) + + disk.partition_umount(image_drive) + + +def copy_previous_installation_data(target_dir: str) -> None: + if Path('/mnt/config').exists(): + copytree('/mnt/config', f'{target_dir}/opt/vyatta/etc/config', + dirs_exist_ok=True) + if Path('/mnt/ssh').exists(): + copytree('/mnt/ssh', f'{target_dir}/etc/ssh', + dirs_exist_ok=True) + + +def ask_single_disk(disks_available: dict[str, int]) -> str: + """Ask user to select a disk for installation + + Args: + disks_available (dict[str, int]): a list of available disks + """ + print(MSG_INFO_INSTALL_DISKS_LIST) + default_disk: str = list(disks_available)[0] + for disk_name, disk_size in disks_available.items(): + disk_size_human: str = bytes_to_gb(disk_size) + print(f'Drive: {disk_name} ({disk_size_human} GB)') + disk_selected: str = ask_input(MSG_INFO_INSTALL_DISK_SELECT, + default=default_disk, + valid_responses=list(disks_available)) + + # create partitions + if not ask_yes_no(MSG_INFO_INSTALL_DISK_CONFIRM): + print(MSG_INFO_INSTALL_EXIT) + exit() + + search_previous_installation(list(disks_available)) + + disk_details: disk.DiskDetails = create_partitions(disk_selected, + disks_available[disk_selected]) + + disk.filesystem_create(disk_details.partition['efi'], 'efi') + disk.filesystem_create(disk_details.partition['root'], 'ext4') + + return disk_details + + +def check_raid_install(disks_available: dict[str, int]) -> Union[str, None]: + """Ask user to select disks for RAID installation + + Args: + disks_available (dict[str, int]): a list of available disks + """ + if len(disks_available) < 2: + return None + + if not ask_yes_no(MSG_INFO_INSTALL_RAID_CONFIGURE, default=True): + return None + + def format_selection(disk_name: str) -> str: + return f'{disk_name}\t({bytes_to_gb(disks_available[disk_name])} GB)' + + disk0, disk1 = list(disks_available)[0], list(disks_available)[1] + disks_selected: dict[str, int] = { disk0: disks_available[disk0], + disk1: disks_available[disk1] } + + target_size: int = min(disks_selected[disk0], disks_selected[disk1]) + + print(MSG_INFO_INSTALL_DISKS_LIST) + for disk_name, disk_size in disks_selected.items(): + disk_size_human: str = bytes_to_gb(disk_size) + print(f'\t{disk_name} ({disk_size_human} GB)') + if not ask_yes_no(MSG_INFO_INSTALL_RAID_FOUND_DISKS, default=True): + if not ask_yes_no(MSG_INFO_INSTALL_RAID_CHOOSE_DISKS, default=True): + return None + else: + disks_selected = {} + disk0 = select_entry(list(disks_available), 'Disks available:', + 'Select first disk:', format_selection) + + disks_selected[disk0] = disks_available[disk0] + del disks_available[disk0] + disk1 = select_entry(list(disks_available), 'Remaining disks:', + 'Select second disk:', format_selection) + disks_selected[disk1] = disks_available[disk1] + + target_size: int = min(disks_selected[disk0], + disks_selected[disk1]) + + # create partitions + if not ask_yes_no(MSG_INFO_INSTALL_RAID_CONFIRM): + print(MSG_INFO_INSTALL_EXIT) + exit() + + search_previous_installation(list(disks_available)) + + disks: list[disk.DiskDetails] = [] + for disk_selected in list(disks_selected): + print(f'Creating partitions on {disk_selected}') + disk_details = create_partitions(disk_selected, target_size, + prompt=False) + disk.filesystem_create(disk_details.partition['efi'], 'efi') + + disks.append(disk_details) + + print('Creating RAID array') + members = [disk.partition['root'] for disk in disks] + raid_details: raid.RaidDetails = raid.raid_create(members) + # raid init stuff + print('Updating initramfs') + raid.update_initramfs() + # end init + print('Creating filesystem on RAID array') + disk.filesystem_create(raid_details.name, 'ext4') + + return raid_details + + +def prepare_tmp_disr() -> None: + """Create temporary directories for installation + """ + print('Creating temporary directories') + for dir in [DIR_ROOTFS_SRC, DIR_ROOTFS_DST, DIR_DST_ROOT]: + dirpath = Path(dir) + dirpath.mkdir(mode=0o755, parents=True) + + +def setup_grub(root_dir: str) -> None: + """Install GRUB configurations + + Args: + root_dir (str): a path to the root of target filesystem + """ + print('Installing GRUB configuration files') + grub_cfg_main = f'{root_dir}/{grub.GRUB_DIR_MAIN}/grub.cfg' + grub_cfg_vars = f'{root_dir}/{grub.CFG_VYOS_VARS}' + grub_cfg_modules = f'{root_dir}/{grub.CFG_VYOS_MODULES}' + grub_cfg_menu = f'{root_dir}/{grub.CFG_VYOS_MENU}' + grub_cfg_options = f'{root_dir}/{grub.CFG_VYOS_OPTIONS}' + + # create new files + render(grub_cfg_main, grub.TMPL_GRUB_MAIN, {}) + grub.common_write(root_dir) + grub.vars_write(grub_cfg_vars, DEFAULT_BOOT_VARS) + grub.modules_write(grub_cfg_modules, []) + grub.write_cfg_ver(1, root_dir) + render(grub_cfg_menu, grub.TMPL_GRUB_MENU, {}) + render(grub_cfg_options, grub.TMPL_GRUB_OPTS, {}) + + +def configure_authentication(config_file: str, password: str) -> None: + """Write encrypted password to config file + + Args: + config_file (str): path of target config file + password (str): plaintext password + + N.B. this can not be deferred by simply setting the plaintext password + and relying on the config mode script to process at boot, as the config + will not automatically be saved in that case, thus leaving the + plaintext exposed + """ + encrypted_password = linux_context.hash(password) + + with open(config_file) as f: + config_string = f.read() + + config = ConfigTree(config_string) + config.set([ + 'system', 'login', 'user', 'vyos', 'authentication', + 'encrypted-password' + ], + value=encrypted_password, + replace=True) + config.set_tag(['system', 'login', 'user']) + + with open(config_file, 'w') as f: + f.write(config.to_string()) + +def validate_signature(file_path: str, sign_type: str) -> None: + """Validate a file by signature and delete a signature file + + Args: + file_path (str): a path to file + sign_type (str): a signature type + """ + print('Validating signature') + signature_valid: bool = False + # validate with minisig + if sign_type == 'minisig': + for pubkey in [ + '/usr/share/vyos/keys/vyos-release.minisign.pub', + '/usr/share/vyos/keys/vyos-backup.minisign.pub' + ]: + if run(f'minisign -V -q -p {pubkey} -m {file_path} -x {file_path}.minisig' + ) == 0: + signature_valid = True + break + Path(f'{file_path}.minisig').unlink() + # validate with GPG + if sign_type == 'asc': + if run(f'gpg --verify ${file_path}.asc ${file_path}') == 0: + signature_valid = True + Path(f'{file_path}.asc').unlink() + + # warn or pass + if not signature_valid: + if not ask_yes_no(MSG_WARN_ISO_SIGN_INVALID, default=False): + exit(MSG_INFO_INSTALL_EXIT) + else: + print('Signature is valid') + +def download_file(local_file: str, remote_path: str, vrf: str, + username: str, password: str, + progressbar: bool = False, check_space: bool = False): + environ['REMOTE_USERNAME'] = username + environ['REMOTE_PASSWORD'] = password + if vrf is None: + download(local_file, remote_path, progressbar=progressbar, + check_space=check_space, raise_error=True) + else: + vrf_cmd = f'REMOTE_USERNAME={username} REMOTE_PASSWORD={password} \ + ip vrf exec {vrf} {external_download_script} \ + --local-file {local_file} --remote-path {remote_path}' + cmd(vrf_cmd) + +def image_fetch(image_path: str, vrf: str = None, + username: str = '', password: str = '', + no_prompt: bool = False) -> Path: + """Fetch an ISO image + + Args: + image_path (str): a path, remote or local + + Returns: + Path: a path to a local file + """ + try: + # check a type of path + if urlparse(image_path).scheme: + # download an image + download_file(ISO_DOWNLOAD_PATH, image_path, vrf, + username, password, + progressbar=True, check_space=True) + + # download a signature + sign_file = (False, '') + for sign_type in ['minisig', 'asc']: + try: + download_file(f'{ISO_DOWNLOAD_PATH}.{sign_type}', + f'{image_path}.{sign_type}', vrf, + username, password) + sign_file = (True, sign_type) + break + except Exception: + print(f'{sign_type} signature is not available') + # validate a signature if it is available + if sign_file[0]: + validate_signature(ISO_DOWNLOAD_PATH, sign_file[1]) + else: + if (not no_prompt and + not ask_yes_no(MSG_WARN_ISO_SIGN_UNAVAL, default=False)): + cleanup() + exit(MSG_INFO_INSTALL_EXIT) + + return Path(ISO_DOWNLOAD_PATH) + else: + local_path: Path = Path(image_path) + if local_path.is_file(): + return local_path + else: + raise FileNotFoundError + except Exception as e: + print(f'The image cannot be fetched from: {image_path} {e}') + exit(1) + + +def migrate_config() -> bool: + """Check for active config and ask user for migration + + Returns: + bool: user's decision + """ + active_config_path: Path = Path('/opt/vyatta/etc/config/config.boot') + if active_config_path.exists(): + if ask_yes_no(MSG_INPUT_CONFIG_FOUND, default=True): + return True + return False + + +def copy_ssh_host_keys() -> bool: + """Ask user to copy SSH host keys + + Returns: + bool: user's decision + """ + if ask_yes_no('Would you like to copy SSH host keys?', default=True): + return True + return False + + +def cleanup(mounts: list[str] = [], remove_items: list[str] = []) -> None: + """Clean up after installation + + Args: + mounts (list[str], optional): List of mounts to unmount. + Defaults to []. + remove_items (list[str], optional): List of files or directories + to remove. Defaults to []. + """ + print('Cleaning up') + # clean up installation directory by default + mounts_all = disk_partitions(all=True) + for mounted_device in mounts_all: + if mounted_device.mountpoint.startswith(DIR_INSTALLATION) and not ( + mounted_device.device in mounts or + mounted_device.mountpoint in mounts): + mounts.append(mounted_device.mountpoint) + # add installation dir to cleanup list + if DIR_INSTALLATION not in remove_items: + remove_items.append(DIR_INSTALLATION) + # also delete an ISO file + if Path(ISO_DOWNLOAD_PATH).exists( + ) and ISO_DOWNLOAD_PATH not in remove_items: + remove_items.append(ISO_DOWNLOAD_PATH) + + if mounts: + print('Unmounting target filesystems') + for mountpoint in mounts: + disk.partition_umount(mountpoint) + if remove_items: + print('Removing temporary files') + for remove_item in remove_items: + if Path(remove_item).exists(): + if Path(remove_item).is_file(): + Path(remove_item).unlink() + if Path(remove_item).is_dir(): + rmtree(remove_item) + +def cleanup_raid(details: raid.RaidDetails) -> None: + efiparts = [] + for raid_disk in details.disks: + efiparts.append(raid_disk.partition['efi']) + cleanup([details.name, *efiparts], + ['/mnt/installation']) + + +def is_raid_install(install_object: Union[disk.DiskDetails, raid.RaidDetails]) -> bool: + """Check if installation target is a RAID array + + Args: + install_object (Union[disk.DiskDetails, raid.RaidDetails]): a target disk + + Returns: + bool: True if it is a RAID array + """ + if isinstance(install_object, raid.RaidDetails): + return True + return False + + +def install_image() -> None: + """Install an image to a disk + """ + if not image.is_live_boot(): + exit(MSG_ERR_NOT_LIVE) + + print(MSG_INFO_INSTALL_WELCOME) + if not ask_yes_no('Would you like to continue?'): + print(MSG_INFO_INSTALL_EXIT) + exit() + + # configure image name + running_image_name: str = image.get_running_image() + while True: + image_name: str = ask_input(MSG_INPUT_IMAGE_NAME, + running_image_name) + if image.validate_name(image_name): + break + print(MSG_WARN_IMAGE_NAME_WRONG) + + # ask for password + user_password: str = ask_input(MSG_INPUT_PASSWORD, default='vyos', + no_echo=True) + + # ask for default console + console_type: str = ask_input(MSG_INPUT_CONSOLE_TYPE, + default='K', + valid_responses=['K', 'S', 'U']) + console_dict: dict[str, str] = {'K': 'tty', 'S': 'ttyS', 'U': 'ttyUSB'} + + disks: dict[str, int] = find_disks() + + install_target: Union[disk.DiskDetails, raid.RaidDetails, None] = None + try: + install_target = check_raid_install(disks) + if install_target is None: + install_target = ask_single_disk(disks) + + # create directories for installation media + prepare_tmp_disr() + + # mount target filesystem and create required dirs inside + print('Mounting new partitions') + if is_raid_install(install_target): + disk.partition_mount(install_target.name, DIR_DST_ROOT) + Path(f'{DIR_DST_ROOT}/boot/efi').mkdir(parents=True) + else: + disk.partition_mount(install_target.partition['root'], DIR_DST_ROOT) + Path(f'{DIR_DST_ROOT}/boot/efi').mkdir(parents=True) + disk.partition_mount(install_target.partition['efi'], f'{DIR_DST_ROOT}/boot/efi') + + # a config dir. It is the deepest one, so the comand will + # create all the rest in a single step + print('Creating a configuration file') + target_config_dir: str = f'{DIR_DST_ROOT}/boot/{image_name}/rw/opt/vyatta/etc/config/' + Path(target_config_dir).mkdir(parents=True) + chown(target_config_dir, group='vyattacfg') + chmod_2775(target_config_dir) + # copy config + copy('/opt/vyatta/etc/config/config.boot', target_config_dir) + configure_authentication(f'{target_config_dir}/config.boot', + user_password) + Path(f'{target_config_dir}/.vyatta_config').touch() + + # create a persistence.conf + Path(f'{DIR_DST_ROOT}/persistence.conf').write_text('/ union\n') + + # copy system image and kernel files + print('Copying system image files') + for file in Path(DIR_KERNEL_SRC).iterdir(): + if file.is_file(): + copy(file, f'{DIR_DST_ROOT}/boot/{image_name}/') + copy(FILE_ROOTFS_SRC, + f'{DIR_DST_ROOT}/boot/{image_name}/{image_name}.squashfs') + + # copy saved config data and SSH keys + # owner restored on copy of config data by chmod_2775, above + copy_previous_installation_data(f'{DIR_DST_ROOT}/boot/{image_name}/rw') + + if is_raid_install(install_target): + write_dir: str = f'{DIR_DST_ROOT}/boot/{image_name}/rw' + raid.update_default(write_dir) + + setup_grub(DIR_DST_ROOT) + # add information about version + grub.create_structure() + grub.version_add(image_name, DIR_DST_ROOT) + grub.set_default(image_name, DIR_DST_ROOT) + grub.set_console_type(console_dict[console_type], DIR_DST_ROOT) + + if is_raid_install(install_target): + # add RAID specific modules + grub.modules_write(f'{DIR_DST_ROOT}/{grub.CFG_VYOS_MODULES}', + ['part_msdos', 'part_gpt', 'diskfilter', + 'ext2','mdraid1x']) + # install GRUB + if is_raid_install(install_target): + print('Installing GRUB to the drives') + l = install_target.disks + for disk_target in l: + disk.partition_mount(disk_target.partition['efi'], f'{DIR_DST_ROOT}/boot/efi') + grub.install(disk_target.name, f'{DIR_DST_ROOT}/boot/', + f'{DIR_DST_ROOT}/boot/efi', + id=f'VyOS (RAID disk {l.index(disk_target) + 1})') + disk.partition_umount(disk_target.partition['efi']) + else: + print('Installing GRUB to the drive') + grub.install(install_target.name, f'{DIR_DST_ROOT}/boot/', + f'{DIR_DST_ROOT}/boot/efi') + + # umount filesystems and remove temporary files + if is_raid_install(install_target): + cleanup([install_target.name], + ['/mnt/installation']) + else: + cleanup([install_target.partition['efi'], + install_target.partition['root']], + ['/mnt/installation']) + + # we are done + print(MSG_INFO_INSTALL_SUCCESS) + exit() + + except Exception as err: + print(f'Unable to install VyOS: {err}') + # unmount filesystems and clenup + try: + if install_target is not None: + if is_raid_install(install_target): + cleanup_raid(install_target) + else: + cleanup([install_target.partition['efi'], + install_target.partition['root']], + ['/mnt/installation']) + except Exception as err: + print(f'Cleanup failed: {err}') + + exit(1) + + +@compat.grub_cfg_update +def add_image(image_path: str, vrf: str = None, username: str = '', + password: str = '', no_prompt: bool = False) -> None: + """Add a new image + + Args: + image_path (str): a path to an ISO image + """ + if image.is_live_boot(): + exit(MSG_ERR_LIVE) + + # fetch an image + iso_path: Path = image_fetch(image_path, vrf, username, password, no_prompt) + try: + # mount an ISO + Path(DIR_ISO_MOUNT).mkdir(mode=0o755, parents=True) + disk.partition_mount(iso_path, DIR_ISO_MOUNT, 'iso9660') + + # check sums + print('Validating image checksums') + if not Path(DIR_ISO_MOUNT).joinpath('sha256sum.txt').exists(): + cleanup() + exit(MSG_ERR_IMPROPER_IMAGE) + if run(f'cd {DIR_ISO_MOUNT} && sha256sum --status -c sha256sum.txt'): + cleanup() + exit('Image checksum verification failed.') + + # mount rootfs (to get a system version) + Path(DIR_ROOTFS_SRC).mkdir(mode=0o755, parents=True) + disk.partition_mount(f'{DIR_ISO_MOUNT}/live/filesystem.squashfs', + DIR_ROOTFS_SRC, 'squashfs') + + cfg_ver: str = image.get_image_tools_version(DIR_ROOTFS_SRC) + version_name: str = image.get_image_version(DIR_ROOTFS_SRC) + + disk.partition_umount(f'{DIR_ISO_MOUNT}/live/filesystem.squashfs') + + if cfg_ver < SYSTEM_CFG_VER: + raise compat.DowngradingImageTools( + f'Adding image would downgrade image tools to v.{cfg_ver}; disallowed') + + if not no_prompt: + image_name: str = ask_input(MSG_INPUT_IMAGE_NAME, version_name) + set_as_default: bool = ask_yes_no(MSG_INPUT_IMAGE_DEFAULT, default=True) + else: + image_name: str = version_name + set_as_default: bool = True + + # find target directory + root_dir: str = disk.find_persistence() + + # a config dir. It is the deepest one, so the comand will + # create all the rest in a single step + target_config_dir: str = f'{root_dir}/boot/{image_name}/rw/opt/vyatta/etc/config/' + # copy config + if no_prompt or migrate_config(): + print('Copying configuration directory') + # copytree preserves perms but not ownership: + Path(target_config_dir).mkdir(parents=True) + chown(target_config_dir, group='vyattacfg') + chmod_2775(target_config_dir) + copytree('/opt/vyatta/etc/config/', target_config_dir, + dirs_exist_ok=True) + else: + Path(target_config_dir).mkdir(parents=True) + chown(target_config_dir, group='vyattacfg') + chmod_2775(target_config_dir) + Path(f'{target_config_dir}/.vyatta_config').touch() + + target_ssh_dir: str = f'{root_dir}/boot/{image_name}/rw/etc/ssh/' + if no_prompt or copy_ssh_host_keys(): + print('Copying SSH host keys') + Path(target_ssh_dir).mkdir(parents=True) + host_keys: list[str] = glob('/etc/ssh/ssh_host*') + for host_key in host_keys: + copy(host_key, target_ssh_dir) + + # copy system image and kernel files + print('Copying system image files') + for file in Path(f'{DIR_ISO_MOUNT}/live').iterdir(): + if file.is_file() and (file.match('initrd*') or + file.match('vmlinuz*')): + copy(file, f'{root_dir}/boot/{image_name}/') + copy(f'{DIR_ISO_MOUNT}/live/filesystem.squashfs', + f'{root_dir}/boot/{image_name}/{image_name}.squashfs') + + # unmount an ISO and cleanup + cleanup([str(iso_path)]) + + # add information about version + grub.version_add(image_name, root_dir) + if set_as_default: + grub.set_default(image_name, root_dir) + + except Exception as err: + # unmount an ISO and cleanup + cleanup([str(iso_path)]) + exit(f'Whooops: {err}') + + +def parse_arguments() -> Namespace: + """Parse arguments + + Returns: + Namespace: a namespace with parsed arguments + """ + parser: ArgumentParser = ArgumentParser( + description='Install new system images') + parser.add_argument('--action', + choices=['install', 'add'], + required=True, + help='action to perform with an image') + parser.add_argument('--vrf', + help='vrf name for image download') + parser.add_argument('--no-prompt', action='store_true', + help='perform action non-interactively') + parser.add_argument('--username', default='', + help='username for image download') + parser.add_argument('--password', default='', + help='password for image download') + parser.add_argument('--image-path', + help='a path (HTTP or local file) to an image that needs to be installed' + ) + # parser.add_argument('--image_new_name', help='a new name for image') + args: Namespace = parser.parse_args() + # Validate arguments + if args.action == 'add' and not args.image_path: + exit('A path to image is required for add action') + + return args + + +if __name__ == '__main__': + try: + args: Namespace = parse_arguments() + if args.action == 'install': + install_image() + if args.action == 'add': + add_image(args.image_path, args.vrf, + args.username, args.password, args.no_prompt) + + exit() + + except KeyboardInterrupt: + print('Stopped by Ctrl+C') + cleanup() + exit() + + except Exception as err: + exit(f'{err}') diff --git a/src/op_mode/image_manager.py b/src/op_mode/image_manager.py new file mode 100755 index 000000000..e75485f9f --- /dev/null +++ b/src/op_mode/image_manager.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +# +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This file is part of VyOS. +# +# VyOS is free software: you can redistribute it and/or modify it under the +# terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# VyOS is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# VyOS. If not, see <https://www.gnu.org/licenses/>. + +from argparse import ArgumentParser, Namespace +from pathlib import Path +from shutil import rmtree +from sys import exit +from typing import Optional + +from vyos.system import disk, grub, image, compat +from vyos.utils.io import ask_yes_no, select_entry + +SET_IMAGE_LIST_MSG: str = 'The following images are available:' +SET_IMAGE_PROMPT_MSG: str = 'Select an image to set as default:' +DELETE_IMAGE_LIST_MSG: str = 'The following images are installed:' +DELETE_IMAGE_PROMPT_MSG: str = 'Select an image to delete:' +MSG_DELETE_IMAGE_RUNNING: str = 'Currently running image cannot be deleted; reboot into another image first' +MSG_DELETE_IMAGE_DEFAULT: str = 'Default image cannot be deleted; set another image as default first' + + +@compat.grub_cfg_update +def delete_image(image_name: Optional[str] = None, + no_prompt: bool = False) -> None: + """Remove installed image files and boot entry + + Args: + image_name (str): a name of image to delete + """ + available_images: list[str] = grub.version_list() + if image_name is None: + if no_prompt: + exit('An image name is required for delete action') + else: + image_name = select_entry(available_images, + DELETE_IMAGE_LIST_MSG, + DELETE_IMAGE_PROMPT_MSG) + if image_name == image.get_running_image(): + exit(MSG_DELETE_IMAGE_RUNNING) + if image_name == image.get_default_image(): + exit(MSG_DELETE_IMAGE_DEFAULT) + if image_name not in available_images: + exit(f'The image "{image_name}" cannot be found') + persistence_storage: str = disk.find_persistence() + if not persistence_storage: + exit('Persistence storage cannot be found') + + if (not no_prompt and + not ask_yes_no(f'Do you really want to delete the image {image_name}?', + default=False)): + exit() + + # remove files and menu entry + version_path: Path = Path(f'{persistence_storage}/boot/{image_name}') + try: + rmtree(version_path) + grub.version_del(image_name, persistence_storage) + print(f'The image "{image_name}" was successfully deleted') + except Exception as err: + exit(f'Unable to remove the image "{image_name}": {err}') + + +@compat.grub_cfg_update +def set_image(image_name: Optional[str] = None, + prompt: bool = True) -> None: + """Set default boot image + + Args: + image_name (str): an image name + """ + available_images: list[str] = grub.version_list() + if image_name is None: + if not prompt: + exit('An image name is required for set action') + else: + image_name = select_entry(available_images, + SET_IMAGE_LIST_MSG, + SET_IMAGE_PROMPT_MSG) + if image_name == image.get_default_image(): + exit(f'The image "{image_name}" already configured as default') + if image_name not in available_images: + exit(f'The image "{image_name}" cannot be found') + persistence_storage: str = disk.find_persistence() + if not persistence_storage: + exit('Persistence storage cannot be found') + + # set default boot image + try: + grub.set_default(image_name, persistence_storage) + print(f'The image "{image_name}" is now default boot image') + except Exception as err: + exit(f'Unable to set default image "{image_name}": {err}') + + +@compat.grub_cfg_update +def rename_image(name_old: str, name_new: str) -> None: + """Rename installed image + + Args: + name_old (str): old name + name_new (str): new name + """ + if name_old == image.get_running_image(): + exit('Currently running image cannot be renamed') + available_images: list[str] = grub.version_list() + if name_old not in available_images: + exit(f'The image "{name_old}" cannot be found') + if name_new in available_images: + exit(f'The image "{name_new}" already exists') + if not image.validate_name(name_new): + exit(f'The image name "{name_new}" is not allowed') + + persistence_storage: str = disk.find_persistence() + if not persistence_storage: + exit('Persistence storage cannot be found') + + if not ask_yes_no( + f'Do you really want to rename the image {name_old} ' + f'to the {name_new}?', + default=False): + exit() + + try: + # replace default boot item + if name_old == image.get_default_image(): + grub.set_default(name_new, persistence_storage) + + # rename files and dirs + old_path: Path = Path(f'{persistence_storage}/boot/{name_old}') + new_path: Path = Path(f'{persistence_storage}/boot/{name_new}') + old_path.rename(new_path) + + # replace boot item + grub.version_del(name_old, persistence_storage) + grub.version_add(name_new, persistence_storage) + + print(f'The image "{name_old}" was renamed to "{name_new}"') + except Exception as err: + exit(f'Unable to rename image "{name_old}" to "{name_new}": {err}') + + +def list_images() -> None: + """Print list of available images for CLI hints""" + images_list: list[str] = grub.version_list() + for image_name in images_list: + print(image_name) + + +def parse_arguments() -> Namespace: + """Parse arguments + + Returns: + Namespace: a namespace with parsed arguments + """ + parser: ArgumentParser = ArgumentParser(description='Manage system images') + parser.add_argument('--action', + choices=['delete', 'set', 'rename', 'list'], + required=True, + help='action to perform with an image') + parser.add_argument('--no-prompt', action='store_true', + help='perform action non-interactively') + parser.add_argument( + '--image-name', + help= + 'a name of an image to add, delete, install, rename, or set as default') + parser.add_argument('--image-new-name', help='a new name for image') + args: Namespace = parser.parse_args() + # Validate arguments + if args.action == 'rename' and (not args.image_name or + not args.image_new_name): + exit('Both old and new image names are required for rename action') + + return args + + +if __name__ == '__main__': + try: + args: Namespace = parse_arguments() + if args.action == 'delete': + delete_image(args.image_name, args.no_prompt) + if args.action == 'set': + set_image(args.image_name) + if args.action == 'rename': + rename_image(args.image_name, args.image_new_name) + if args.action == 'list': + list_images() + + exit() + + except KeyboardInterrupt: + print('Stopped by Ctrl+C') + exit() + + except Exception as err: + exit(f'{err}') diff --git a/src/system/grub_update.py b/src/system/grub_update.py new file mode 100644 index 000000000..3c851f0e0 --- /dev/null +++ b/src/system/grub_update.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +# +# Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> +# +# This file is part of VyOS. +# +# VyOS is free software: you can redistribute it and/or modify it under the +# terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# VyOS is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# VyOS. If not, see <https://www.gnu.org/licenses/>. + +from pathlib import Path +from sys import exit + +from vyos.system import disk, grub, image, compat, SYSTEM_CFG_VER +from vyos.template import render + + +def cfg_check_update() -> bool: + """Check if GRUB structure update is required + + Returns: + bool: False if not required, True if required + """ + current_ver = grub.get_cfg_ver() + if current_ver and current_ver >= SYSTEM_CFG_VER: + return False + + return True + + +if __name__ == '__main__': + if image.is_live_boot(): + exit(0) + + if image.is_running_as_container(): + exit(0) + + # Skip everything if update is not required + if not cfg_check_update(): + exit(0) + + # find root directory of persistent storage + root_dir = disk.find_persistence() + + # read current GRUB config + grub_cfg_main = f'{root_dir}/{grub.GRUB_CFG_MAIN}' + vars = grub.vars_read(grub_cfg_main) + modules = grub.modules_read(grub_cfg_main) + vyos_menuentries = compat.parse_menuentries(grub_cfg_main) + vyos_versions = compat.find_versions(vyos_menuentries) + unparsed_items = compat.filter_unparsed(grub_cfg_main) + # compatibilty for raid installs + search_root = compat.get_search_root(unparsed_items) + common_dict = {} + common_dict['search_root'] = search_root + # find default values + default_entry = vyos_menuentries[int(vars['default'])] + default_settings = { + 'default': grub.gen_version_uuid(default_entry['version']), + 'bootmode': default_entry['bootmode'], + 'console_type': default_entry['console_type'], + 'console_num': default_entry['console_num'] + } + vars.update(default_settings) + + # create new files + grub_cfg_vars = f'{root_dir}/{grub.CFG_VYOS_VARS}' + grub_cfg_modules = f'{root_dir}/{grub.CFG_VYOS_MODULES}' + grub_cfg_platform = f'{root_dir}/{grub.CFG_VYOS_PLATFORM}' + grub_cfg_menu = f'{root_dir}/{grub.CFG_VYOS_MENU}' + grub_cfg_options = f'{root_dir}/{grub.CFG_VYOS_OPTIONS}' + + Path(image.GRUB_DIR_VYOS).mkdir(exist_ok=True) + grub.vars_write(grub_cfg_vars, vars) + grub.modules_write(grub_cfg_modules, modules) + grub.common_write(grub_common=common_dict) + render(grub_cfg_menu, grub.TMPL_GRUB_MENU, {}) + render(grub_cfg_options, grub.TMPL_GRUB_OPTS, {}) + + # create menu entries + for vyos_ver in vyos_versions: + boot_opts = None + for entry in vyos_menuentries: + if entry.get('version') == vyos_ver and entry.get( + 'bootmode') == 'normal': + boot_opts = entry.get('boot_opts') + grub.version_add(vyos_ver, root_dir, boot_opts) + + # update structure version + cfg_ver = compat.update_cfg_ver(root_dir) + grub.write_cfg_ver(cfg_ver, root_dir) + + if compat.mode(): + compat.render_grub_cfg(root_dir) + else: + render(grub_cfg_main, grub.TMPL_GRUB_MAIN, {}) + + exit(0) diff --git a/src/system/standalone_root_pw_reset b/src/system/standalone_root_pw_reset new file mode 100755 index 000000000..c82cea321 --- /dev/null +++ b/src/system/standalone_root_pw_reset @@ -0,0 +1,178 @@ +#!/bin/bash +# **** License **** +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# This code was originally developed by Vyatta, Inc. +# Portions created by Vyatta are Copyright (C) 2007 Vyatta, Inc. +# All Rights Reserved. +# +# Author: Bob Gilligan <gilligan@vyatta.com> +# Description: Standalone script to set the admin passwd to new value +# value. Note: This script can ONLY be run as a standalone +# init program by grub. +# +# **** End License **** + +# The Vyatta config file: +CF=/opt/vyatta/etc/config/config.boot + +# Admin user name +ADMIN=vyos + +set_encrypted_password() { + sed -i \ + -e "/ user $1 {/,/encrypted-password/s/encrypted-password .*\$/encrypted-password \"$2\"/" $3 +} + + +# How long to wait for user to respond, in seconds +TIME_TO_WAIT=30 + +change_password() { + local user=$1 + local pwd1="1" + local pwd2="2" + + until [ "$pwd1" == "$pwd2" ] + do + read -p "Enter $user password: " -r -s pwd1 + echo + read -p "Retype $user password: " -r -s pwd2 + echo + + if [ "$pwd1" != "$pwd2" ] + then echo "Passwords do not match" + fi + done + + # set the password for the user then store it in the config + # so the user is recreated on the next full system boot. + local epwd=$(mkpasswd --method=sha-512 "$pwd1") + # escape any slashes in resulting password + local eepwd=$(sed 's:/:\\/:g' <<< $epwd) + set_encrypted_password $user $eepwd $CF +} + +# System is so messed up that doing anything would be a mistake +dead() { + echo $* + echo + echo "This tool can only recover missing admininistrator password." + echo "It is not a full system restore" + echo + echo -n "Hit return to reboot system: " + read + /sbin/reboot -f +} + +echo "Standalone root password recovery tool." +echo +# +# Check to see if we are running in standalone mode. We'll +# know that we are if our pid is 1. +# +if [ "$$" != "1" ]; then + echo "This tool can only be run in standalone mode." + exit 1 +fi + +# +# OK, now we know we are running in standalone mode. Talk to the +# user. +# +echo -n "Do you wish to reset the admin password? (y or n) " +read -t $TIME_TO_WAIT response +if [ "$?" != "0" ]; then + echo + echo "Response not received in time." + echo "The admin password will not be reset." + echo "Rebooting in 5 seconds..." + sleep 5 + echo + /sbin/reboot -f +fi + +response=${response:0:1} +if [ "$response" != "y" -a "$response" != "Y" ]; then + echo "OK, the admin password will not be reset." + echo -n "Rebooting in 5 seconds..." + sleep 5 + echo + /sbin/reboot -f +fi + +echo -en "Which admin account do you want to reset? [$ADMIN] " +read admin_user +ADMIN=${admin_user:-$ADMIN} + +echo "Starting process to reset the admin password..." + +echo "Re-mounting root filesystem read/write..." +mount -o remount,rw / + +if [ ! -f /etc/passwd ] +then dead "Missing password file" +fi + +if [ ! -d /opt/vyatta/etc/config ] +then dead "Missing VyOS config directory /opt/vyatta/etc/config" +fi + +# Leftover from V3.0 +if grep -q /opt/vyatta/etc/config /etc/fstab +then + echo "Mounting the config filesystem..." + mount /opt/vyatta/etc/config/ +fi + +if [ ! -f $CF ] +then dead "$CF file not found" +fi + +if ! grep -q 'system {' $CF +then dead "$CF file does not contain system settings" +fi + +if ! grep -q ' login {' $CF +then + # Recreate login section of system + sed -i -e '/system {/a\ + login {\ + }' $CF +fi + +if ! grep -q " user $ADMIN " $CF +then + echo "Recreating administrator $ADMIN in $CF..." + sed -i -e "/ login {/a\\ + user $ADMIN {\\ + authentication {\\ + encrypted-password \$6$IhbXHdwgYkLnt/$VRIsIN5c2f2v4L2l4F9WPDrRDEtWXzH75yBswmWGERAdX7oBxmq6m.sWON6pO6mi6mrVgYBxdVrFcCP5bI.nt.\\ + plaintext-password \"\"\\ + }\\ + level admin\\ + }" $CF +fi + +echo "Saving backup copy of config.boot..." +cp $CF ${CF}.before_pwrecovery +sync + +echo "Setting the administrator ($ADMIN) password..." +change_password $ADMIN + +echo $(date "+%b%e %T") $(hostname) "Admin password changed" \ + | tee -a /var/log/auth.log >>/var/log/messages + +sync + +echo "System will reboot in 10 seconds..." +sleep 10 +/sbin/reboot -f diff --git a/src/systemd/vyos-grub-update.service b/src/systemd/vyos-grub-update.service new file mode 100644 index 000000000..522b13a33 --- /dev/null +++ b/src/systemd/vyos-grub-update.service @@ -0,0 +1,14 @@ +[Unit] +Description=Update GRUB loader configuration structure +After=local-fs.target +Before=vyos-router.service + +[Service] +Type=oneshot +ExecStart=/usr/libexec/vyos/system/grub_update.py +TimeoutSec=5 +KillMode=process +StandardOutput=journal+console + +[Install] +WantedBy=vyos-router.service \ No newline at end of file