diff --git a/python/vyos/system/disk.py b/python/vyos/system/disk.py index f8e0fd1bf..b8a2c0f35 100644 --- a/python/vyos/system/disk.py +++ b/python/vyos/system/disk.py @@ -1,224 +1,229 @@ # Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see <http://www.gnu.org/licenses/>. from json import loads as json_loads from os import sync from dataclasses import dataclass from psutil import disk_partitions from vyos.utils.process import run, cmd @dataclass class DiskDetails: """Disk details""" name: str partition: dict[str, str] def disk_cleanup(drive_path: str) -> None: """Clean up disk partition table (MBR and GPT) + Remove partition and device signatures. Zeroize primary and secondary headers - first and last 17408 bytes (512 bytes * 34 LBA) on a drive Args: drive_path (str): path to a drive that needs to be cleaned """ + partitions: list[str] = partition_list(drive_path) + for partition in partitions: + run(f'wipefs -af {partition}') + run(f'wipefs -af {drive_path}') run(f'sgdisk -Z {drive_path}') def find_persistence() -> str: """Find a mountpoint for persistence storage Returns: str: Path where 'persistance' pertition is mounted, Empty if not found """ mounted_partitions = disk_partitions() for partition in mounted_partitions: if partition.mountpoint.endswith('/persistence'): return partition.mountpoint return '' def parttable_create(drive_path: str, root_size: int) -> None: """Create a hybrid MBR/GPT partition table 0-2047 first sectors are free 2048-4095 sectors - BIOS Boot Partition 4096 + 256 MB - EFI system partition Everything else till the end of a drive - Linux partition Args: drive_path (str): path to a drive """ if not root_size: root_size_text: str = '+100%' else: root_size_text: str = str(root_size) command = f'sgdisk -a1 -n1:2048:4095 -t1:EF02 -n2:4096:+256M -t2:EF00 \ -n3:0:+{root_size_text}K -t3:8300 {drive_path}' run(command) # update partitons in kernel sync() run(f'partprobe {drive_path}') partitions: list[str] = partition_list(drive_path) disk: DiskDetails = DiskDetails( name = drive_path, partition = { 'efi': next(x for x in partitions if x.endswith('2')), 'root': next(x for x in partitions if x.endswith('3')) } ) return disk def partition_list(drive_path: str) -> list[str]: """Get a list of partitions on a drive Args: drive_path (str): path to a drive Returns: list[str]: a list of partition paths """ lsblk: str = cmd(f'lsblk -Jp {drive_path}') drive_info: dict = json_loads(lsblk) device: list = drive_info.get('blockdevices') children: list[str] = device[0].get('children', []) if device else [] partitions: list[str] = [child.get('name') for child in children] return partitions def partition_parent(partition_path: str) -> str: """Get a parent device for a partition Args: partition (str): path to a partition Returns: str: path to a parent device """ parent: str = cmd(f'lsblk -ndpo pkname {partition_path}') return parent def from_partition(partition_path: str) -> DiskDetails: drive_path: str = partition_parent(partition_path) partitions: list[str] = partition_list(drive_path) disk: DiskDetails = DiskDetails( name = drive_path, partition = { 'efi': next(x for x in partitions if x.endswith('2')), 'root': next(x for x in partitions if x.endswith('3')) } ) return disk def filesystem_create(partition: str, fstype: str) -> None: """Create a filesystem on a partition Args: partition (str): path to a partition (for example: '/dev/sda1') fstype (str): filesystem type ('efi' or 'ext4') """ if fstype == 'efi': command = 'mkfs -t fat -n EFI' run(f'{command} {partition}') if fstype == 'ext4': command = 'mkfs -t ext4 -L persistence' run(f'{command} {partition}') def partition_mount(partition: str, path: str, fsype: str = '', overlay_params: dict[str, str] = {}) -> bool: """Mount a partition into a path Args: partition (str): path to a partition (for example: '/dev/sda1') path (str): a path where to mount fsype (str): optionally, set fstype ('squashfs', 'overlay', 'iso9660') overlay_params (dict): optionally, set overlay parameters. Defaults to None. Returns: bool: True on success """ if fsype in ['squashfs', 'iso9660']: command: str = f'mount -o loop,ro -t {fsype} {partition} {path}' if fsype == 'overlay' and overlay_params: command: str = f'mount -t overlay -o noatime,\ upperdir={overlay_params["upperdir"]},\ lowerdir={overlay_params["lowerdir"]},\ workdir={overlay_params["workdir"]} overlay {path}' else: command = f'mount {partition} {path}' rc = run(command) if rc == 0: return True return False def partition_umount(partition: str = '', path: str = '') -> None: """Umount a partition by a partition name or a path Args: partition (str): path to a partition (for example: '/dev/sda1') path (str): a path where a partition is mounted """ if partition: command = f'umount {partition}' run(command) if path: command = f'umount {path}' run(command) def find_device(mountpoint: str) -> str: """Find a device by mountpoint Returns: str: Path to device, Empty if not found """ mounted_partitions = disk_partitions() for partition in mounted_partitions: if partition.mountpoint == mountpoint: return partition.mountpoint return '' def disks_size() -> dict[str, int]: """Get a dictionary with physical disks and their sizes Returns: dict[str, int]: a dictionary with name: size mapping """ disks_size: dict[str, int] = {} lsblk: str = cmd('lsblk -Jbp') blk_list = json_loads(lsblk) for device in blk_list.get('blockdevices'): if device['type'] == 'disk': disks_size.update({device['name']: device['size']}) return disks_size diff --git a/python/vyos/system/raid.py b/python/vyos/system/raid.py index 13b99fa69..616d1adf7 100644 --- a/python/vyos/system/raid.py +++ b/python/vyos/system/raid.py @@ -1,115 +1,112 @@ # Copyright 2023 VyOS maintainers and contributors <maintainers@vyos.io> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this library. If not, see <http://www.gnu.org/licenses/>. """RAID related functions""" from pathlib import Path from shutil import copy from dataclasses import dataclass from vyos.utils.process import cmd from vyos.system import disk @dataclass class RaidDetails: """RAID type""" name: str level: str members: list[str] disks: list[disk.DiskDetails] def raid_create(raid_members: list[str], raid_name: str = 'md0', raid_level: str = 'raid1') -> None: """Create a RAID array Args: raid_name (str): a name of array (data, backup, test, etc.) raid_members (list[str]): a list of array members raid_level (str, optional): an array level. Defaults to 'raid1'. """ raid_devices_num: int = len(raid_members) raid_members_str: str = ' '.join(raid_members) if Path('/sys/firmware/efi').exists(): for part in raid_members: drive: str = disk.partition_parent(part) command: str = f'sgdisk --typecode=3:A19D880F-05FC-4D3B-A006-743F0F84911E {drive}' cmd(command) else: for part in raid_members: drive: str = disk.partition_parent(part) command: str = f'sgdisk --typecode=3:A19D880F-05FC-4D3B-A006-743F0F84911E {drive}' cmd(command) - for part in raid_members: - command: str = f'mdadm --zero-superblock {part}' - cmd(command) command: str = f'mdadm --create /dev/{raid_name} -R --metadata=1.0 \ --raid-devices={raid_devices_num} --level={raid_level} \ {raid_members_str}' cmd(command) raid = RaidDetails( name = f'/dev/{raid_name}', level = raid_level, members = raid_members, disks = [disk.from_partition(m) for m in raid_members] ) return raid def update_initramfs() -> None: """Update initramfs""" mdadm_script = '/etc/initramfs-tools/scripts/local-top/mdadm' copy('/usr/share/initramfs-tools/scripts/local-block/mdadm', mdadm_script) p = Path(mdadm_script) p.write_text(p.read_text().replace('$((COUNT + 1))', '20')) command: str = 'update-initramfs -u' cmd(command) def update_default(target_dir: str) -> None: """Update /etc/default/mdadm to start MD monitoring daemon at boot """ source_mdadm_config = '/etc/default/mdadm' target_mdadm_config = Path(target_dir).joinpath('/etc/default/mdadm') target_mdadm_config_dir = Path(target_mdadm_config).parent Path.mkdir(target_mdadm_config_dir, parents=True, exist_ok=True) s = Path(source_mdadm_config).read_text().replace('START_DAEMON=false', 'START_DAEMON=true') Path(target_mdadm_config).write_text(s) def get_uuid(device: str) -> str: """Get UUID of a device""" command: str = f'tune2fs -l {device}' l = cmd(command).splitlines() uuid = next((x for x in l if x.startswith('Filesystem UUID')), '') return uuid.split(':')[1].strip() if uuid else '' def get_uuids(raid_details: RaidDetails) -> tuple[str]: """Get UUIDs of RAID members Args: raid_name (str): a name of array (data, backup, test, etc.) Returns: tuple[str]: root_disk uuid, root_md uuid """ raid_name: str = raid_details.name root_partition: str = raid_details.members[0] uuid_root_disk: str = get_uuid(root_partition) uuid_root_md: str = get_uuid(raid_name) return uuid_root_disk, uuid_root_md