Page MenuHomeVyOS Platform

Update static host mappings with dhcp v4/v6 static leases
Open, NormalPublicENHANCEMENT

Description

Static lease hostname on dhcpv4 are added to pdns and /etc/hosts, the same process can be done for dhcpv6 static leases.

For now I'm using a postconfig script:

#!/usr/bin/env python3
from collections.abc import Iterable
import subprocess
from vyos.config import Config
from vyos.hostsd_client import Client as HostsdClient
from vyos.logger import syslog


class HostEntries(dict):
    __slots__ = ("_hostsd",)

    def __init__(self, hostsd: HostsdClient | None = None):
        super().__init__()
        self._hostsd = hostsd or HostsdClient()

    def add(
        self,
        tag: str,
        name: str,
        domain: str,
        ip: str,
        *,
        aliases: Iterable[str] = (),
        assume_new: bool = True,  # fast path (same cost as direct assignment)
        log: bool = True,
    ) -> None:
        fqdn = f"{name}.{domain}"
        aliases = list(aliases)

        if assume_new:
            #self[f"{tag}-{ip}"] = {fqdn: {"address": [ip], "aliases": aliases[:]}}
            self[f"{tag}-{ip}-{domain}"] = {
                fqdn: {"address": [ip], "aliases": aliases[:]}
            }
        else:
            #for key in (f"{tag}-{ip}", f"{tag}-{ip}-{domain}"):
            for key in (f"{tag}-{ip}-{domain}",):
                hostmap = self.setdefault(key, {})
                rec = hostmap.setdefault(fqdn, {"address": [ip], "aliases": []})
                if ip not in rec["address"]:
                    rec["address"].append(ip)
                for a in aliases:
                    if a not in rec["aliases"]:
                        rec["aliases"].append(a)

        if log:
            syslog.info(f"Added {fqdn},{ip} to hostsd")

    def commit(
        self,
        hostsd: HostsdClient | None = None,
        *,
        apply: bool = True,
        clear_on_success: bool = True,
    ) -> bool:
        """Push entries to hostsd and (optionally) apply."""
        if not self:
            syslog.info("HostEntries.commit: no entries to apply")
            return False

        client = hostsd or self._hostsd
        try:
            client.add_hosts(self)
            if apply:
                client.apply()
            if clear_on_success:
                self.clear()
            syslog.info("HostEntries.commit: host entries applied")
            return True
        except Exception as e:
            syslog.error(f"HostEntries.commit: failed to apply host entries: {e}")
            return False


def main():
    config = Config()
    hostsd = HostsdClient()

    # Get system domain name
    system_domain = config.return_effective_value("system domain-name", default=None)
    print(f"Loading static dhcp mapping for system domain: {system_domain}")
    syslog.info(f"Loading static dhcp mapping for system domain: {system_domain}")

    # Dictionary to hold all host entries
    hosts = HostEntries()

    # Get DHCPv4 static mappings and add to hosts
    dhcp_server = config.get_config_dict(
        ["service", "dhcp-server", "shared-network-name"],
        effective=True,
        get_first_key=True,
    )
    for pool, pooldef in (dhcp_server or {}).items():
        static_mappings = pooldef.get("subnet", {})
        for subnet, subnetdef in (static_mappings or {}).items():
            for name, mapping in (subnetdef.get("static-mapping", {}) or {}).items():
                ip = mapping.get("ip-address")
                if not name or not ip:
                    continue
                domain = pooldef.get("option", {}).get("domain-name", system_domain)
                if not domain:
                    print(
                        f"No domain-name option found for {pool}, no system domain-name set, skipping {name} {ip}"
                    )
                    continue
                # Add hosts
                hosts.add("dhcp-server", name, domain, ip)

    # Get DHCPv6 static mappings and add to hosts
    dhcpv6_server = config.get_config_dict(
        ["service", "dhcpv6-server", "shared-network-name"],
        effective=True,
        get_first_key=True,
    )
    for pool, pooldef in (dhcpv6_server or {}).items():
        for subnet, subnetdef in (pooldef.get("subnet", {}) or {}).items():
            for name, mapping in (subnetdef.get("static-mapping", {}) or {}).items():
                ip = mapping.get("ipv6-address")
                if not name or not ip:
                    continue
                domains = pooldef.get("option", {}).get("domain-search", [])
                if not isinstance(domains, list):
                    domains = [domains]
                if domains:
                    for domain in domains:
                        hosts.add("dhcpv6-server", name, domain, ip)
                else:
                    domain = system_domain
                    if not domain:
                        print(
                            f"No domain-search option found for {pool}, no system domain-name set, skipping {name} {ip}"
                        )
                        continue
                    hosts.add("dhcpv6-server", name, domain, ip)

    # Commit host entries to hostsd
    if hosts.commit(hostsd):
        print("Hostsd entries applied successfully")
        syslog.info("Hostsd entries applied successfully")

        # Get list of configured containers
        containers = (
            config.get_config_dict(
                ["container", "name"], effective=True, get_first_key=True
            )
            or {}
        )
        if not isinstance(containers, dict) or len(containers) == 0:
            syslog.info("No containers configured; skipping restarts")
            return

        # Restart all containers to ensure hosts are reloaded
        print("Restarting all containers to apply new hostsd entries")
        syslog.info("Restarting all containers to apply new hostsd entries")
        for cname, cdef in containers.items():
            p = subprocess.run(
                ["/usr/bin/vyos-op-run", "restart", "container", cname],
                capture_output=True,
                text=True,
            )
            if p.returncode != 0:
                print(f"> Failed to restart container {cname}: {p.stderr.strip()}")
                syslog.error(f"Failed to restart container {cname}: {p.stderr.strip()}")
            else:
                print(f"> Container {cname} restarted successfully")
                syslog.info(f"Container {cname} restarted successfully")


if __name__ == "__main__":
    main()

Details

Version
-
Is it a breaking change?
Perfectly compatible
Issue type
Feature (new functionality)