#!/usr/bin/env python3 # SPDX-FileCopyrightText: (C) 2025 Enno Tensing # # SPDX-License-Identifier: GPL-2.0-only """Util module holding Container and related objects.""" from dataclasses import dataclass from typing import Self, TypeAlias from log import Log ConfigValue: TypeAlias = str | dict | list | bool | None class ConfigError(Exception): """An invalid config.""" message: str def __init__(self, message: str = "") -> None: """Create Exception object.""" self.message = message super().__init__(message) def __str__(self) -> str: """Convert Exception object to a string.""" return f"Configuration error: {self.message}" def maybe(json: dict, key: str) -> str | dict | list | bool | None: """Maybe get a value.""" try: return json[key] except KeyError: return None def trim(s: str) -> str: """Remove sequential whitespace.""" s = s.replace("\t ", "\t") s = s.replace("\t\\\n", " ") while " " in s: s = s.replace(" ", " ") return s @dataclass class Volume: """Container Volume.""" name: str path: str @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> list | None: """Create from JSON.""" if val is None: return None if not isinstance(val, dict): logger.log_warning("Volume key is present, but malformed.") return None return [cls(key, value) for key, value in val.items()] def command(self) -> str: """Option for podman container create.""" return f"--volume {self.name}:{self.path}" def create(self) -> str: """Create volume, if it does not exist.""" cmd = f"# Create volume {self.name}\n" cmd += f"if ! podman volume exists '{self.name}' 2> /dev/null\n" cmd += "then\n" cmd += "\tpodman volume create '{self.name}'\n" cmd += "fi\n" return cmd def remove(self) -> str: """Remove volume if it exists.""" cmd = f"# Remove volume {self.name}\n" cmd += f"if podman volume exists '{self.name}' 2> /dev/null\n" cmd += "then\n" cmd += f"\tpodman volume rm '{self.name}'\n" cmd += "fi\n" return cmd @dataclass class Secret: """Container Secret.""" name: str secret_type: str target: str options: str @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> list | None: """Create from JSON.""" if val is None: return None if not isinstance(val, dict): logger.log_warning("Secret key is present, but malformed!") return None secrets = [] for key in val: if not isinstance(val[key], dict): logger.log_warning(f"Secret {key} is malformed!") continue name = key secret_type = maybe(val[key], "type") target = maybe(val[key], "target") options = maybe(val[key], "options") if options is None: options = "" secrets.append( cls(name, str(secret_type), str(target), str(options)) ) return secrets def command(self) -> str: """Option for podman container create.""" cmd = ( f"--secret {self.name},type={self.secret_type},target={self.target}" ) # Not a password, ruff... if self.secret_type == "mount" and self.options != "": # noqa: S105 cmd = f"{cmd},{self.options}" return cmd def create(self) -> str: """Create secret if it does not exist.""" cmd = f"# Create secret {self.name}\n" cmd += f"if ! podman secret exists '{self.name}' 2> /deb/null\n" cmd += "then\n" cmd += "\thead /dev/urandom \\\n" cmd += "\t\t| tr -dc A-Za-z0-9 \\\n" cmd += "\t\t| head -c 128 \\\n" cmd += f"\t\t| podman secret create '{self.name}' -\n" cmd += "fi\n" return cmd def remove(self) -> str: """Remove secret if it exists.""" cmd = f"# Remove secret {self.name}\n" cmd += f"if podman secret exists '{self.name}' 2> /dev/null\n" cmd += "then\n" cmd += f"\tpodman secret rm '{self.name}'\n" cmd += "fi\n" return cmd @dataclass class Environment: """Container environment.""" variables: list file: str @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> Self: """Create from JSON.""" if val is None: return cls([], "") if not isinstance(val, dict): logger.log_warning("Environment key is present, but malformed!") return cls([], "") return cls([f"{key}='{value}'" for key, value in val.items()], "") def command(self) -> str: """Option for podman container create.""" return f"--env-file={self.file}" def create(self) -> str: """Create env file.""" cmd = f"# Create env-file {self.file}\n" for var in self.variables: escaped_var = var.replace("'", "%b") cmd += f"printf '{escaped_var}\\n' \"'\" >> '{self.file}'\n" return cmd def remove(self) -> str: """Remove env file.""" cmd = f"# Remove env-file {self.file}\n" cmd += f"if [ -e '{self.file}' ]\n" cmd += "then\n" cmd += "\trm '{self.file}'\n" cmd += "fi\n" return cmd @dataclass class Ports: """Container Ports.""" tcp_ports: list udp_ports: list @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> Self: """Create from JSON.""" if val is None: return cls([], []) if not isinstance(val, dict): logger.log_warning("Ports key is present, but malformed!") return cls([], []) tcp_ports = maybe(val, "tcp") udp_ports = maybe(val, "udp") if not isinstance(tcp_ports, list): logger.log_warning("Key tcp_ports is not an array!") return cls([], []) if not isinstance(udp_ports, list): logger.log_warning("Key udp_ports is not an array!") return cls([], []) return cls(tcp_ports, udp_ports) def command(self) -> str: """Option for podman container create.""" ports = "" seperator = " \\\n" ports += seperator.join( [f"\t--port {port}/tcp" for port in self.tcp_ports] ) ports += seperator ports += seperator.join( [f"\t--port {port}/udp" for port in self.udp_ports] ) ports += seperator return ports @dataclass class Network: """Container Network.""" mode: str options: list @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> Self: """Create from JSON.""" if val is None or not isinstance(val, dict): logger.log_error("Network configuration is missing or malformed!") return cls("", []) mode = maybe(val, "mode") options = maybe(val, "options") if mode is None or options is None or not isinstance(options, list): err = "Network configuration is missing or has malformed elements!" logger.log_error(err) return cls("", []) return cls(str(mode), options) def command(self) -> str: """Option for podman container create.""" if self.mode == "": return "" cmd = f"--network={self.mode}" opts = ",".join(self.options) if opts != "": cmd += f":{opts}" return cmd @dataclass class Image: """Container Image.""" registry: str image: str tag: str @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> Self | None: """Create from JSON.""" if val is None or not isinstance(val, dict): logger.log_error("Image key either not present or malformed!") return None registry = maybe(val, "registry") image = maybe(val, "image") tag = maybe(val, "tag") return cls(str(registry), str(image), str(tag)) def command(self) -> str: """Option for podman container create.""" return f"{self.registry}/{self.image}:{self.tag}" @dataclass class Capability: """Container Capability.""" cap: str mode: str @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> list | None: """Create from JSON.""" if val is None or not isinstance(val, dict): logger.log_warning( "Capabilities key is either missing or malformed!" ) return None add = [cls(value, "add") for value in val["add"]] drop = [cls(value, "drop") for value in val["drop"]] return add + drop def command(self) -> str: """Option for podman container create.""" return f"--cap-{self.mode}={self.cap}" @dataclass class Dns: """Container DNS.""" servers: list search: str @classmethod def from_json(cls, val: ConfigValue, logger: Log) -> Self: """Create from JSON.""" if val is None or not isinstance(val, dict): logger.log_error("DNS Key is either missing or malformed!") return cls([], "") search = maybe(val, "search") servers = maybe(val, "servers") if not isinstance(servers, list): logger.log_error("Servers key is not an array!") return cls([], "") return cls(servers, str(search)) def command(self) -> str: """Option for podman container create.""" if len(self.servers) == 0 and self.search == "": return "" if len(self.servers) == 0: return f"--dns-search={self.search}" if self.search == "": return f"--dns={','.join(self.servers)}" cmd = f"--dns-search={self.search} \\\n\t--dns=" cmd += ",".join(self.servers) return cmd class Container: """Container.""" name: str image: Image privileged: bool read_only: bool replace: bool restart: str pull_policy: str network: Network dns: Dns ports: Ports env: Environment | None secrets: list | None volumes: list | None capabilities: list | None def __init__(self, json: dict, logger: Log | None = None) -> None: """Create from JSON.""" if logger is None: logger = Log("/dev/stdout") name = maybe(json, "name") if name is None: logger.log_error("No container name set, aborting!") return image = maybe(json, "image") if image is None: logger.log_error("No image set, aborting!") return privileged = maybe(json, "privileged") read_only = maybe(json, "read_only") replace = maybe(json, "replace") pull_policy = maybe(json, "pull_policy") if pull_policy is None: pull_policy = "always" restart = maybe(json, "restart") if restart is None: restart = "no" network = maybe(json, "network") dns = maybe(json, "dns") ports = maybe(json, "ports") env = maybe(json, "env") secrets = maybe(json, "secrets") volumes = maybe(json, "volumes") capabilities = maybe(json, "capabilities") self.name = str(name) self.image = Image.from_json(image, logger) self.privileged = privileged is not None and bool(privileged) self.read_only = read_only is not None and bool(read_only) self.replace = replace is not None and bool(replace) self.pull_policy = str(pull_policy) self.restart = str(restart) self.network = Network.from_json(network, logger) self.dns = Dns.from_json(dns, logger) self.ports = Ports.from_json(ports, logger) self.env = Environment.from_json(env, logger) self.env.file = "/var/lib/containerctl/containers/" self.env.file += f"{self.name}/environment" self.secrets = Secret.from_json(secrets, logger) self.volumes = Volume.from_json(volumes, logger) self.capabilities = Capability.from_json(capabilities, logger) def create_volumes(self) -> str: """Generate podman volume create commands.""" if self.volumes is None: return "" volumes = "" for volume in self.volumes: volumes += volume.create() return volumes def create_secrets(self) -> str: """Generate podman secret create commands.""" if self.secrets is None: return "" secrets = "" for secret in self.secrets: secrets += secret.create() return secrets def create_environment(self) -> str: """Wrap self.env.create().""" if self.env is None: return "" return self.env.create() def create_container(self) -> str: """Generate podman container create command.""" cmd = f"# Create container {self.name}\n" cmd += "podman container crate \\\n" cmd += f"\t--name={self.name} \\\n" if self.privileged: cmd += "\t--privileged \\\n" if self.replace: cmd += "\t--replace \\\n" if self.read_only: cmd += "\t--read-only \\\n" cmd += f"\t--restart={self.restart} \\\n" cmd += f"\t--pull={self.pull_policy} \\\n" cmd += f"\t{self.network.command()} \\\n" cmd += f"\t{self.dns.command()} \\\n" cmd += f"{self.ports.command()}" if self.env is not None: cmd += f"\t{self.env.command()} \\\n" for secret in self.secrets: cmd += f"\t{secret.command()} \\\n" for volume in self.volumes: cmd += f"\t{volume.command()} \\\n" for capability in self.capabilities: cmd += f"\t{capability.command()} \\\n" cmd += f"\t{self.image.command()}\n" return cmd def start_container(self) -> str: """Generate podman container start command.""" cmd = f"# Start container {self.name}\n" cmd += f"podman container start {self.name}\n" return cmd def stop_container(self) -> str: """Generate podman container stop command.""" cmd = f"# Stop container {self.name}\n" cmd += f"podman container stop {self.name}\n" return cmd def restart_container(self) -> str: """Generate podman container restart composite command.""" cmd = f"# Restart container {self.name}\n" cmd += self.stop_container() cmd += "sleep 5s\n" cmd += self.start_container() return cmd def upgrade_container(self) -> str: """Generate podman container upgrade composite command.""" comment = f"# Upgrade container {self.name}\n" remove_container = self.remove_container() remove_env_file = self.remove_environment() create_container = self.create_container() create_env_file = self.create_environment() start_container = self.start_container() remove = f"{remove_container}\n{remove_env_file}\n" create = f"{create_env_file}\n{create_container}\n" create_and_start = create + "\n" + start_container return f"{comment}\n{remove}\n{create_and_start}" def purge_container(self) -> str: """Generate podman container purge composite command.""" remove_container = self.remove_container() remove_volumes = self.remove_volumes() remove_secrets = self.remove_secrets() remove_env_file = self.remove_environment() remove_data = f"{remove_volumes}\n{remove_secrets}\n{remove_env_file}" return f"{remove_container}\n{remove_data}" def remove_container(self) -> str: """Generate podman container remove command.""" cmd = self.stop_container() cmd += f"\n# Remove container {self.name}\n" cmd += f"podman container rm {self.name}\n" return cmd def remove_volumes(self) -> str: """Generate podman volume remove commands.""" if self.volumes is None: return "" volumes = "" for volume in self.volumes: volumes += volume.remove() return volumes def remove_secrets(self) -> str: """Generate podman secret remove commands.""" if self.secrets is None: return "" secrets = "" for secret in self.secrets: secrets += secret.remove() return secrets def remove_environment(self) -> str: """Wrap self.env.remove().""" if self.env is None: return "" return self.env.remove()