#!/usr/bin/env python3 # SPDX-FileCopyrightText: (C) 2025 Enno Tensing # # SPDX-License-Identifier: GPL-2.0-only """Util module holding Container and related objects.""" from dataclasses import dataclass from typing import Self, TypeAlias from log import Log ConfigValue: TypeAlias = str | dict | list | bool | None class ConfigError(Exception): """An invalid config.""" message: str def __init__(self, message: str = "") -> None: """Create Exception object.""" self.message = message super().__init__(message) def __str__(self) -> str: """Convert Exception object to a string.""" return f"Configuration error: {self.message}" def maybe(json: dict, key: str) -> ConfigValue: """Maybe get a value.""" try: return json[key] except KeyError: return None def maybe_or(json: dict, key: str, _or: ConfigValue) -> ConfigValue: """Maybe get a value, but return _or if it is None.""" val = maybe(json, key) if val is None or not isinstance(val, type(_or)): return _or return val def trim(s: str) -> str: """Remove sequential whitespace.""" s = s.replace("\t ", "\t") s = s.replace("\t\\\n", " ") while " " in s: s = s.replace(" ", " ") return s @dataclass class Cgroup: """cgroup Config.""" config: list parent: str namespace: str how: str @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> Self: """Create from JSON.""" if val is None: return cls([], "", "", "") if not isinstance(val, dict): logger.log_warning("cgroup Config is invalid!") return cls([], "", "", "") config = maybe_or(val, "config", []) parent = maybe_or(val, "parent", "") namespace = maybe_or(val, "namespace", "") how = maybe_or(val, "how", "") if not isinstance(config, list): logger.log_warning("Config key in cgroup Config is invalid!") config = [] if not isinstance(parent, str): logger.log_warning("Parent key in cgroup Config is invalid!") parent = "" if not isinstance(namespace, str): logger.log_warning("Namespace key in cgroup Config is invalid!") namespace = "" if not isinstance(how, str): logger.log_warning("How key in cgroup Config is invalid!") how = "" if how == "split" and parent != "": logger.log_warning( "Split cgroups can not be combined with a cgroup parent!" ) parent = "" return cls(config, parent, namespace, how) def command(self) -> str: """Option for podman container create.""" cmd = "" seperator = " \\\n" cmd += seperator.join( [f"\t--cgroup-conf={option}{seperator}" for option in self.config] ) if self.parent != "": cmd += f"\t--cgroup-parent={self.parent}{seperator}" if self.namespace != "": cmd += f"\t--cgroupns={self.namespace}{seperator}" if self.how != "": cmd += f"\t--cgroups={self.how}{seperator}" return cmd @dataclass class Cpu: """CPU Accounting.""" period: str quota: str shares: str number: str cpuset_cpus: str cpuset_mems: str @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> Self: """Create from JSON.""" if val is None: return cls("", "", "", "", "", "") if not isinstance(val, dict): logger.log_warning("cpu Config is invalid!") return cls("", "", "", "", "", "") period = maybe_or(val, "period", "") quota = maybe_or(val, "quota", "") shares = maybe_or(val, "shares", "") number = maybe_or(val, "number", "") cpuset = maybe_or(val, "cpuset", {}) cpuset_cpus = "" cpuset_mems = "" if len(cpuset) != 0: cpuset_cpus += maybe_or(cpuset, "cpus", "") cpuset_mems += maybe_or(cpuset, "mems", "") return cls( period, quota, shares, number, cpuset_cpus, cpuset_mems, ) def command(self) -> str: """Option for podman container create.""" cmd = "" seperator = " \\\n" if self.period != "": cmd += f"\t--cpu-period={self.period}{seperator}" if self.quota != "": cmd += f"\t--cpu-quota={self.quota}{seperator}" if self.shares != "": cmd += f"\t--cpu-shares={self.shares}{seperator}" if self.number != "": cmd += f"\t--cpus={self.number}{seperator}" if self.cpuset_cpus != "": cmd += f"\t--cpuset-cpus={self.cpuset_cpus}{seperator}" if self.cpuset_mems != "": cmd += f"\t--cpuset-mems={self.cpuset_mems}{seperator}" return cmd @dataclass class Memory: """Memory accounting.""" limit: str reservation: str swap: str @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> Self: """Create from JSON.""" if val is None: return cls("", "", "") if not isinstance(val, dict): logger.log_warning("memory Config is invalid!") return cls("", "", "") limit = maybe_or(val, "limit", "") reservation = maybe_or(val, "reservation", "") swap = maybe_or(val, "swap", "") if limit == "": return cls("", "", "") return cls(limit, reservation, swap) def command(self) -> str: """Option for podman container create.""" if self.limit == "": return "" cmd = "" seperator = " \\\n" cmd += f"\t--memory={self.limit}{seperator}" if self.reservation != "": cmd += f"\t--memory-reservation={self.reservation}{seperator}" if self.swap != "": cmd += f"\t--memory-swap={self.swap}{seperator}" return cmd @dataclass class Accounting: """Resource Accounting.""" cgroup: Cgroup cpu: Cpu memory: Memory @classmethod def from_json(cls, data: ConfigValue, logger: Log) -> Self: """Create from JSON.""" if data is None: return cls( Cgroup.from_json(None, logger), Cpu.from_json(None, logger), Memory.from_json(None, logger), ) cgroup_data = maybe(data, "cgroup") cpu_data = maybe(data, "cpu") memory_data = maybe(data, "memory") cgroup = Cgroup.from_json(cgroup_data, logger) cpu = Cpu.from_json(cpu_data, logger) memory = Memory.from_json(memory_data, logger) return cls(cgroup, cpu, memory) def command(self) -> str: """Options for podman container create.""" cgroup = self.cgroup.command() cpu = self.cpu.command() memory = self.memory.command() return cgroup + cpu + memory @dataclass class Volume: """Container Volume.""" name: str path: str @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> list | None: """Create from JSON.""" if val is None: return None if not isinstance(val, dict): logger.log_warning("Volume key is present, but malformed.") return None return [cls(key, value) for key, value in val.items()] def is_host_volume(self) -> bool: """Check if this Volume is a named or a host volume.""" return self.name.startswith("/") or self.name.startswith(".") def command(self) -> str: """Option for podman container create.""" return f"--volume {self.name}:{self.path}" def create(self) -> str: """Create volume, if it does not exist.""" if self.is_host_volume(): # A HOST-DIR starting with a slash or dot is interpreted as a # directory or file on the host machine. Since the script should # not modify things outside its own tree, i.e. # /var/lib/containerctl, ignore it. return "" cmd = f"# Create volume {self.name}\n" cmd += f"if ! podman volume exists '{self.name}' 2> /dev/null\n" cmd += "then\n" cmd += f"\tpodman volume create '{self.name}'\n" cmd += "fi\n" return cmd def remove(self) -> str: """Remove volume if it exists.""" if self.is_host_volume(): # As with create, don't touch the host system return "" cmd = f"# Remove volume {self.name}\n" cmd += f"if podman volume exists '{self.name}' 2> /dev/null\n" cmd += "then\n" cmd += f"\tpodman volume rm '{self.name}'\n" cmd += "fi\n" return cmd @dataclass class Secret: """Container Secret.""" name: str secret_type: str target: str options: str @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> list | None: """Create from JSON.""" if val is None: return None if not isinstance(val, dict): logger.log_warning("Secret key is present, but malformed!") return None secrets = [] for key in val: if not isinstance(val[key], dict): logger.log_warning(f"Secret {key} is malformed!") continue name = key secret_type = maybe_or(val[key], "type", "") target = maybe_or(val[key], "target", "") options = maybe_or(val[key], "options", "") if options is None: options = "" secrets.append( cls(name, str(secret_type), str(target), str(options)) ) return secrets def command(self) -> str: """Option for podman container create.""" cmd = ( f"--secret {self.name},:" f"type={self.secret_type}," f"target={self.target}" ) # Not a password, ruff... if self.secret_type == "mount" and self.options != "": # noqa: S105 cmd = f"{cmd},{self.options}" return cmd def create(self) -> str: """Create secret if it does not exist.""" cmd = f"# Create secret {self.name}\n" cmd += f"if ! podman secret exists '{self.name}' 2> /deb/null\n" cmd += "then\n" cmd += "\thead /dev/urandom \\\n" cmd += "\t\t| tr -dc A-Za-z0-9 \\\n" cmd += "\t\t| head -c 128 \\\n" cmd += f"\t\t| podman secret create '{self.name}' -\n" cmd += "fi\n" return cmd def remove(self) -> str: """Remove secret if it exists.""" cmd = f"# Remove secret {self.name}\n" cmd += f"if podman secret exists '{self.name}' 2> /dev/null\n" cmd += "then\n" cmd += f"\tpodman secret rm '{self.name}'\n" cmd += "fi\n" return cmd @dataclass class Environment: """Container environment.""" variables: list file: str @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> Self: """Create from JSON.""" if val is None: return cls([], "") if not isinstance(val, dict): logger.log_warning("Environment key is present, but malformed!") return cls([], "") return cls([f"{key}='{value}'" for key, value in val.items()], "") def command(self) -> str: """Option for podman container create.""" return f"--env-file={self.file}" def create(self) -> str: """Create env file.""" cmd = f"# Create env-file {self.file}\n" for var in self.variables: escaped_var = var.replace("'", "%b") cmd += f"printf '{escaped_var}\\n' \"'\" \"'\" >> '{self.file}'\n" return cmd def remove(self) -> str: """Remove env file.""" cmd = f"# Remove env-file {self.file}\n" cmd += f"if [ -e '{self.file}' ]\n" cmd += "then\n" cmd += f"\trm '{self.file}'\n" cmd += "fi\n" return cmd @dataclass class Ports: """Container Ports.""" tcp_ports: list udp_ports: list @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> Self: """Create from JSON.""" if val is None: return cls([], []) if not isinstance(val, dict): logger.log_warning("Ports key is present, but malformed!") return cls([], []) tcp_ports = maybe(val, "tcp") udp_ports = maybe(val, "udp") if not isinstance(tcp_ports, list): logger.log_warning("Key tcp_ports is not an array!") return cls([], []) if not isinstance(udp_ports, list): logger.log_warning("Key udp_ports is not an array!") return cls([], []) return cls(tcp_ports, udp_ports) def command(self) -> str: """Option for podman container create.""" ports = "" seperator = " \\\n" ports += seperator.join( [f"\t--port {port}/tcp" for port in self.tcp_ports] ) ports += seperator ports += seperator.join( [f"\t--port {port}/udp" for port in self.udp_ports] ) ports += seperator return ports @dataclass class Network: """Container Network.""" mode: str options: list @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> Self: """Create from JSON.""" if val is None or not isinstance(val, dict): logger.log_error("Network configuration is missing or malformed!") return cls("", []) mode = maybe(val, "mode") options = maybe(val, "options") if mode is None or options is None or not isinstance(options, list): err = "Network configuration is missing or has malformed elements!" logger.log_error(err) return cls("", []) return cls(str(mode), options) def command(self) -> str: """Option for podman container create.""" if self.mode == "": return "" cmd = f"--network={self.mode}" opts = ",".join(self.options) if opts != "": cmd += f":{opts}" return cmd @dataclass class Image: """Container Image.""" registry: str image: str tag: str @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> Self | None: """Create from JSON.""" if val is None or not isinstance(val, dict): logger.log_error("Image key either not present or malformed!") return None registry = maybe_or(val, "registry", "") image = maybe_or(val, "image", "") tag = maybe_or(val, "tag", "") return cls(str(registry), str(image), str(tag)) def command(self) -> str: """Option for podman container create.""" return f"{self.registry}/{self.image}:{self.tag}" @dataclass class Capability: """Container Capability.""" cap: str mode: str @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> list | None: """Create from JSON.""" if val is None or not isinstance(val, dict): logger.log_warning( "Capabilities key is either missing or malformed!" ) return None add = [cls(value, "add") for value in val["add"]] drop = [cls(value, "drop") for value in val["drop"]] return add + drop def command(self) -> str: """Option for podman container create.""" return f"--cap-{self.mode}={self.cap}" @dataclass class Dns: """Container DNS.""" servers: list search: str @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> Self: """Create from JSON.""" if val is None or not isinstance(val, dict): logger.log_error("DNS Key is either missing or malformed!") return cls([], "") search = maybe_or(val, "search", "") servers = maybe(val, "servers") if not isinstance(servers, list): logger.log_error("Servers key is not an array!") return cls([], "") return cls(servers, str(search)) def command(self) -> str: """Option for podman container create.""" if len(self.servers) == 0 and self.search == "": return "" if len(self.servers) == 0: return f"--dns-search={self.search}" if self.search == "": return f"--dns={','.join(self.servers)}" cmd = f"--dns-search={self.search} \\\n\t--dns=" cmd += ",".join(self.servers) return cmd class Container: """Container.""" name: str image: Image privileged: bool read_only: bool replace: bool restart: str pull_policy: str timezone: str network: Network dns: Dns ports: Ports env: Environment | None secrets: list | None volumes: list | None capabilities: list | None accounting: Accounting def __init__(self, json: dict, logger: Log | None = None) -> None: """Create from JSON.""" if logger is None: logger = Log("/dev/stdout") name = maybe(json, "name") if name is None: logger.log_error("No container name set, aborting!") return image = maybe(json, "image") if image is None: logger.log_error("No image set, aborting!") return privileged = maybe(json, "privileged") read_only = maybe(json, "read_only") replace = maybe(json, "replace") pull_policy = maybe_or(json, "pull_policy", "always") restart = maybe_or(json, "restart", "no") timezone = maybe_or(json, "timezone", "local") network = maybe(json, "network") dns = maybe(json, "dns") ports = maybe(json, "ports") env = maybe(json, "env") secrets = maybe(json, "secrets") volumes = maybe(json, "volumes") capabilities = maybe(json, "capabilities") accounting = maybe(json, "accounting") self.name = str(name) self.image = Image.from_json(image, logger) self.privileged = privileged is not None and bool(privileged) self.read_only = read_only is not None and bool(read_only) self.replace = replace is not None and bool(replace) self.pull_policy = str(pull_policy) self.restart = str(restart) self.timezone = str(timezone) self.network = Network.from_json(network, logger) self.dns = Dns.from_json(dns, logger) self.ports = Ports.from_json(ports, logger) self.env = Environment.from_json(env, logger) self.env.file = "/var/lib/containerctl/containers/" self.env.file += f"{self.name}/environment" self.secrets = Secret.from_json(secrets, logger) self.volumes = Volume.from_json(volumes, logger) self.capabilities = Capability.from_json(capabilities, logger) self.accounting = Accounting.from_json(accounting, logger) def create_volumes(self) -> str: """Generate podman volume create commands.""" if self.volumes is None: return "" volumes = "" for volume in self.volumes: volumes += volume.create() return volumes def create_secrets(self) -> str: """Generate podman secret create commands.""" if self.secrets is None: return "" secrets = "" for secret in self.secrets: secrets += secret.create() return secrets def create_environment(self) -> str: """Wrap self.env.create().""" if self.env is None: return "" return self.env.create() def create_container(self) -> str: """Generate podman container create command.""" cmd = f"# Create container {self.name}\n" cmd += "podman container crate \\\n" cmd += f"\t--name={self.name} \\\n" if self.privileged: cmd += "\t--privileged \\\n" if self.replace: cmd += "\t--replace \\\n" if self.read_only: cmd += "\t--read-only \\\n" cmd += f"\t--restart={self.restart} \\\n" cmd += f"\t--pull={self.pull_policy} \\\n" cmd += f"\t--tz={self.timezone} \\\n" cmd += f"\t{self.network.command()} \\\n" cmd += f"\t{self.dns.command()} \\\n" cmd += f"{self.ports.command()}" if self.env is not None: cmd += f"\t{self.env.command()} \\\n" for secret in self.secrets: cmd += f"\t{secret.command()} \\\n" for volume in self.volumes: cmd += f"\t{volume.command()} \\\n" for capability in self.capabilities: cmd += f"\t{capability.command()} \\\n" cmd += f"{self.accounting.command()}" cmd += f"\t{self.image.command()}\n" return cmd def start_container(self) -> str: """Generate podman container start command.""" cmd = f"# Start container {self.name}\n" cmd += f"podman container start {self.name}\n" return cmd def stop_container(self) -> str: """Generate podman container stop command.""" cmd = f"# Stop container {self.name}\n" cmd += f"podman container stop {self.name}\n" return cmd def restart_container(self) -> str: """Generate podman container restart composite command.""" cmd = f"# Restart container {self.name}\n" cmd += self.stop_container() cmd += "sleep 5s\n" cmd += self.start_container() return cmd def upgrade_container(self) -> str: """Generate podman container upgrade composite command.""" comment = f"# Upgrade container {self.name}\n" remove_container = self.remove_container() remove_env_file = self.remove_environment() create_container = self.create_container() create_env_file = self.create_environment() start_container = self.start_container() remove = f"{remove_container}\n{remove_env_file}\n" create = f"{create_env_file}\n{create_container}\n" create_and_start = create + "\n" + start_container return f"{comment}\n{remove}\n{create_and_start}" def purge_container(self) -> str: """Generate podman container purge composite command.""" remove_container = self.remove_container() remove_volumes = self.remove_volumes() remove_secrets = self.remove_secrets() remove_env_file = self.remove_environment() remove_data = f"{remove_volumes}\n{remove_secrets}\n{remove_env_file}" return f"{remove_container}\n{remove_data}" def remove_container(self) -> str: """Generate podman container remove command.""" cmd = self.stop_container() cmd += f"\n# Remove container {self.name}\n" cmd += f"podman container rm {self.name}\n" return cmd def remove_volumes(self) -> str: """Generate podman volume remove commands.""" if self.volumes is None: return "" volumes = "" for volume in self.volumes: volumes += volume.remove() return volumes def remove_secrets(self) -> str: """Generate podman secret remove commands.""" if self.secrets is None: return "" secrets = "" for secret in self.secrets: secrets += secret.remove() return secrets def remove_environment(self) -> str: """Wrap self.env.remove().""" if self.env is None: return "" return self.env.remove()