From 995a66c753bec44d31d1aa708322d0967a7e680a Mon Sep 17 00:00:00 2001 From: Enno Tensing Date: Tue, 22 Jul 2025 13:00:41 +0200 Subject: [PATCH] generate: Add python script to generate control scripts Generates control scripts from a given container config. For now the actual generate.py does nothing. Signed-off-by: Enno Tensing --- generate/container.py | 548 ++++++++++++++++++++++++++++++++++++++++++ generate/generate.py | 77 ++++++ generate/log.py | 74 ++++++ 3 files changed, 699 insertions(+) create mode 100644 generate/container.py create mode 100644 generate/generate.py create mode 100644 generate/log.py diff --git a/generate/container.py b/generate/container.py new file mode 100644 index 0000000..6cf255c --- /dev/null +++ b/generate/container.py @@ -0,0 +1,548 @@ +#!/usr/bin/env python3 +"""Util module holding Container and related objects.""" + +from dataclasses import dataclass +from pathlib import Path +from typing import Self, TypeAlias + +from log import Log + +ConfigValue: TypeAlias = str | dict | list | bool | None + + +class ConfigError(Exception): + """An invalid config.""" + + message: str + + def __init__(self, message: str = "") -> None: + """Create Exception object.""" + self.message = message + self.super.__init__(message) + + def __str__(self) -> str: + """Convert Exception object to a string.""" + return f"Configuration error: {self.message}" + + +def join(data: list, separator: str = ",") -> str: + """Join a list together.""" + ret: str = "" + x = data.copy() + if len(x) == 0: + return ret + while len(x) > 1: + ret += x.pop() + separator + ret += x.pop() + return ret + + +def maybe(json: dict, key: str) -> str | dict | list | bool | None: + """Maybe get a value.""" + try: + return json[key] + except KeyError: + return None + + +def command_join(items: list | None) -> str: + """Like join(), but call command().""" + joined = "" + if items is not None: + for item in items: + joined += " " + item.command() + return joined + + +def trim(s: str) -> str: + """Remove sequential whitespace.""" + s = s.replace("\t ", "\t") + s = s.replace("\t\\\n", " ") + while s.__contains__(" "): + s = s.replace(" ", " ") + return s + + +@dataclass +class Volume: + """Container Volume.""" + + name: str + path: str + + @classmethod + def from_json(cls, val: ConfigValue, logger: Log) -> list | None: + """Create from JSON.""" + if val is None: + return None + if not isinstance(val, dict): + logger.log_warning("Volume key is present, but malformed.") + return None + return [cls(key, value) for key, value in val.items()] + + def command(self) -> str: + """Option for podman container create.""" + return f"--volume {self.name}:{self.path}" + + def create(self) -> str: + """Create volume, if it does not exist.""" + cmd = f"# Create volume {self.name}\n" + cmd += f"if ! podman volume exists '{self.name}' 2> /dev/null\n" + cmd += "then\n" + cmd += "\tpodman volume create '{self.name}'\n" + cmd += "fi\n" + return cmd + + def remove(self) -> str: + """Remove volume if it exists.""" + cmd = f"# Remove volume {self.name}\n" + cmd += f"if podman volume exists '{self.name}' 2> /dev/null\n" + cmd += "then\n" + cmd += f"\tpodman volume rm '{self.name}'\n" + cmd += "fi\n" + return cmd + + +@dataclass +class Secret: + """Container Secret.""" + + name: str + secret_type: str + target: str + options: str + + @classmethod + def from_json(cls, val: ConfigValue, logger: Log) -> list | None: + """Create from JSON.""" + if val is None: + return None + if not isinstance(val, dict): + logger.log_warning("Secret key is present, but malformed!") + return None + secrets = [] + for key in val: + if not isinstance(val[key], dict): + logger.log_warning(f"Secret {key} is malformed!") + continue + name = key + secret_type = maybe(val[key], "type") + target = maybe(val[key], "target") + options = maybe(val[key], "options") + if options is None: + options = "" + secrets.append( + cls(name, str(secret_type), str(target), str(options)) + ) + + return secrets + + def command(self) -> str: + """Option for podman container create.""" + cmd = ( + f"--secret {self.name},type={self.secret_type},target={self.target}" + ) + if self.target == "mount" and self.options != "": + cmd = f"{cmd},{self.options}" + + return cmd + + def create(self) -> str: + """Create secret if it does not exist.""" + cmd = f"# Create secret {self.name}\n" + cmd += f"if ! podman secret exists '{self.name}' 2> /deb/null\n" + cmd += "then\n" + cmd += "\thead /dev/urandom \\\n" + cmd += "\t\t| tr -dc A-Za-z0-9 \\\n" + cmd += "\t\t| head -c 128 \\\n" + cmd += f"\t\t| podman secret create '{self.name}' -\n" + cmd += "fi\n" + return cmd + + def remove(self) -> str: + """Remove secret if it exists.""" + cmd = f"# Remove secret {self.name}\n" + cmd += f"if podman secret exists '{self.name}' 2> /dev/null\n" + cmd += "then\n" + cmd += f"\tpodman secret rm '{self.name}'\n" + cmd += "fi\n" + return cmd + + +@dataclass +class Environment: + """Container environment.""" + + variables: list + file: str + + @classmethod + def from_json(cls, val: ConfigValue, logger: Log) -> Self: + """Create from JSON.""" + if val is None: + return cls([], "") + if not isinstance(val, dict): + logger.log_warning("Environment key is present, but malformed!") + return cls([], "") + return cls([f"{key}='{value}'" for key, value in val.items()], "") + + def command(self) -> str: + """Option for podman container create.""" + return f"--env-file={self.file}" + + def create(self) -> str: + """Create env file.""" + file = Path(self.file) + cmd = f"# Create env-file {self.file}\n" + cmd += f"if ! [ -d '{file.parent}' ]\n" + cmd += "then\n" + cmd += f"\tmkdir --parents '{file.parent}'\n" + cmd += "fi\n" + for var in self.variables: + escaped_var = var.replace("'", "%b") + cmd += f"printf '{escaped_var}\\n' \"'\" >> '{self.file}'\n" + + return cmd + + def remove(self) -> str: + """Remove env file.""" + cmd = f"# Remove env-file {self.file}\n" + cmd += f"if [ -e '{self.file}' ]\n" + cmd += "then\n" + cmd += "\trm '{self.file}'\n" + cmd += "fi\n" + return cmd + + +@dataclass +class Ports: + """Container Ports.""" + + tcp_ports: list + udp_ports: list + + @classmethod + def from_json(cls, val: ConfigValue, logger: Log) -> Self: + """Create from JSON.""" + if val is None: + return cls([], []) + if not isinstance(val, dict): + logger.log_warning("Ports key is present, but malformed!") + return cls([], []) + tcp_ports = maybe(val, "tcp") + udp_ports = maybe(val, "udp") + if not isinstance(tcp_ports, list): + logger.log_warning("Key tcp_ports is not an array!") + return cls([], []) + if not isinstance(udp_ports, list): + logger.log_warning("Key udp_ports is not an array!") + return cls([], []) + return cls(tcp_ports, udp_ports) + + def command(self) -> str: + """Option for podman container create.""" + ports = "" + for port in self.tcp_ports: + ports += f"\t--port {port}/tcp \\\n" + for port in self.udp_ports: + ports += f"\t--port {port}/udp \\\n" + return ports + + +@dataclass +class Network: + """Container Network.""" + + mode: str + options: list + + @classmethod + def from_json(cls, val: ConfigValue, logger: Log) -> Self: + """Create from JSON.""" + if val is None or not isinstance(val, dict): + logger.log_error("Network configuration is missing or malformed!") + return cls("", []) + mode = maybe(val, "mode") + options = maybe(val, "options") + if mode is None or options is None or not isinstance(options, list): + err = "Network configuration is missing or has malformed elements!" + logger.log_error(err) + return cls("", []) + return cls(str(mode), options) + + def command(self) -> str: + """Option for podman container create.""" + if self.mode == "": + return "" + cmd = f"--network={self.mode}" + opts = join(self.options) + if opts != "": + cmd += f":{opts}" + return cmd + + +@dataclass +class Image: + """Container Image.""" + + registry: str + image: str + tag: str + + @classmethod + def from_json(cls, val: ConfigValue, logger: Log) -> Self | None: + """Create from JSON.""" + if val is None or not isinstance(val, dict): + logger.log_error("Image key either not present or malformed!") + return None + registry = maybe(val, "registry") + image = maybe(val, "image") + tag = maybe(val, "tag") + return cls(str(registry), str(image), str(tag)) + + def command(self) -> str: + """Option for podman container create.""" + return f"{self.registry}/{self.image}:{self.tag}" + + +@dataclass +class Capability: + """Container Capability.""" + + cap: str + mode: str + + @classmethod + def from_json(cls, val: ConfigValue, logger: Log) -> list | None: + """Create from JSON.""" + if val is None or not isinstance(val, dict): + logger.log_warning( + "Capabilities key is either missing or malformed!" + ) + return None + add = [cls(value, "add") for value in val["add"]] + drop = [cls(value, "drop") for value in val["drop"]] + return add + drop + + def command(self) -> str: + """Option for podman container create.""" + return f"--cap-{self.mode}={self.cap}" + + +@dataclass +class Dns: + """Container DNS.""" + + servers: list + search: str + + @classmethod + def from_json(cls, val: ConfigValue, logger: Log) -> Self: + """Create from JSON.""" + if val is None or not isinstance(val, dict): + logger.log_error("DNS Key is either missing or malformed!") + return cls([], "") + search = maybe(val, "search") + servers = maybe(val, "servers") + if not isinstance(servers, list): + logger.log_error("Servers key is not an array!") + return cls([], "") + return cls(servers, str(search)) + + def command(self) -> str: + """Option for podman container create.""" + if len(self.servers) == 0 and self.serach == "": + return "" + if len(self.servers) == 0: + return f"--dns-search={self.search}" + if self.search == "": + return f"--dns={join(self.servers)}" + + cmd = f"--dns-search={self.search} \\\n\t--dns=" + cmd += join(self.servers) + + return cmd + + +class Container: + """Container.""" + + name: str + image: Image + privileged: bool + read_only: bool + replace: bool + restart: str + pull_policy: str + network: Network + dns: Dns + ports: Ports + env: Environment | None + secrets: list | None + volumes: list | None + capabilities: list | None + + def __init__(self, json: dict, logger: Log | None = None) -> None: + """Create from JSON.""" + if logger is None: + logger = Log("/dev/stdout") + name = maybe(json, "name") + if name is None: + logger.log_error("No container name set, aborting!") + return + image = maybe(json, "image") + if image is None: + logger.log_error("No image set, aborting!") + return + privileged = maybe(json, "privileged") + read_only = maybe(json, "read_only") + replace = maybe(json, "replace") + pull_policy = maybe(json, "pull_policy") + if pull_policy is None: + pull_policy = "always" + restart = maybe(json, "restart") + if restart is None: + restart = "no" + network = maybe(json, "network") + dns = maybe(json, "dns") + ports = maybe(json, "ports") + env = maybe(json, "env") + secrets = maybe(json, "secrets") + volumes = maybe(json, "volumes") + capabilities = maybe(json, "capabilities") + self.name = str(name) + self.image = Image.from_json(image, logger) + self.privileged = privileged is not None and bool(privileged) + self.read_only = read_only is not None and bool(read_only) + self.replace = replace is not None and bool(replace) + self.pull_policy = str(pull_policy) + self.restart = str(restart) + self.network = Network.from_json(network, logger) + self.dns = Dns.from_json(dns, logger) + self.ports = Ports.from_json(ports, logger) + self.env = Environment.from_json(env, logger) + self.env.file = "/var/lib/containerctl/containers/" + self.env.file += f"{self.name}/environment" + self.secrets = Secret.from_json(secrets, logger) + self.volumes = Volume.from_json(volumes, logger) + self.capabilities = Capability.from_json(capabilities, logger) + + def create(self) -> str: + """Generate podman container create command.""" + cmd = f"# Create container {self.name}" + cmd += "podman container crate \\\n" + cmd += f"\t--name={self.name} \\\n" + if self.privileged: + cmd += "\t--privileged \\\n" + if self.replace: + cmd += "\t--replace \\\n" + if self.read_only: + cmd += "\t--read-only \\\n" + cmd += f"\t--restart={self.restart} \\\n" + cmd += f"\t--pull={self.pull_policy} \\\n" + cmd += f"\t{self.network.command()} \\\n" + cmd += f"\t{self.dns.command()} \\\n" + cmd += f"{self.ports.command()}" + if self.env is not None: + cmd += f"\t{self.env.command()} \\\n" + for secret in self.secrets: + cmd += f"\t{secret.command()} \\\n" + for volume in self.volumes: + cmd += f"\t{volume.command()} \\\n" + for capability in self.capabilities: + cmd += f"\t{capability.command()} \\\n" + cmd += f"\t{self.image.command()}\n" + return cmd + + def create_volumes(self) -> str: + """Generate podman volume create commands.""" + if self.volumes is None: + return "" + volumes = "" + for volume in self.volumes: + volumes += volume.create() + return volumes + + def create_secrets(self) -> str: + """Generate podman secret create commands.""" + if self.secrets is None: + return "" + secrets = "" + for secret in self.secrets: + secrets += secret.create() + return secrets + + def create_container(self) -> str: + """Create podman container.""" + cmd = "" + if self.env is not None: + cmd += self.env.create() + cmd += self.create() + return cmd + + def start_container(self) -> str: + """Generate podman container start command.""" + cmd = f"# Start container {self.name}\n" + cmd += f"podman container start {self.name}\n" + return cmd + + def stop_container(self) -> str: + """Generate podman container stop command.""" + cmd = f"# Stop container {self.name}\n" + cmd += f"podman container stop {self.name}\n" + return cmd + + def restart_container(self) -> str: + """Generate podman container restart composite command.""" + cmd = f"# Restart container {self.name}\n" + cmd += self.stop_container() + cmd += "sleep 5s\n" + cmd += self.start_container() + return cmd + + def upgrade_container(self) -> str: + """Generate podman container upgrade composite command.""" + comment = f"# Upgrade container {self.name}\n" + remove_container = self.remove_container() + remove_env_file = self.env.remove() + create_container = self.create_container() + start_container = self.start_container() + remove = f"{remove_container}\n{remove_env_file}\n" + create_and_start = create_container + "\n" + start_container + return f"{comment}\n{remove}\n{create_and_start}" + + def purge_container(self) -> str: + """Generate podman container purge composite command.""" + remove_container = self.remove_container() + remove_volumes = self.remove_volumes() + remove_secrets = self.remove_secrets() + remove_env_file = "" + if self.env is not None: + remove_env_file = self.env.remove() + remove_data = f"{remove_volumes}\n{remove_secrets}\n{remove_env_file}" + return f"{remove_container}\n{remove_data}" + + def remove_container(self) -> str: + """Generate podman container remove command.""" + cmd = self.stop_container() + cmd += f"\n# Remove container {self.name}\n" + cmd += f"podman container rm {self.name}\n" + return cmd + + def remove_volumes(self) -> str: + """Generate podman volume remove commands.""" + if self.volumes is None: + return "" + volumes = "" + for volume in self.volumes: + volumes += volume.remove() + return volumes + + def remove_secrets(self) -> str: + """Generate podman secret remove commands.""" + if self.secrets is None: + return "" + secrets = "" + for secret in self.secrets: + secrets += secret.remove() + return secrets diff --git a/generate/generate.py b/generate/generate.py new file mode 100644 index 0000000..e3cd732 --- /dev/null +++ b/generate/generate.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +"""Generate the control files..""" + +import json +import sys +from pathlib import Path + +from container import ConfigError, Container +from log import Log + + +def load_container_config(file: Path, log: Log) -> dict | None: + """Load a container config.""" + if not file.exists(): + log.log_error(f"{file.name} does not exist!") + return None + data = {} + with file.open("r", encoding="utf-8") as fp: + try: + data = json.load(fp) + except json.JSONDecodeError as e: + log.log_error(f"{file.name} is not a valid JSON file!") + log.log_debug(f"Exception: {e}") + return None + except UnicodeDecodeError as e: + log.log_error(f"{file.name} is not a valid UTF8 file!") + log.log_debug(f"Exception: {e}") + return None + except OSError as e: + log.log_error(f"{file.name} could not be read!") + log.log_debug(f"Exception: {e}") + return None + return data + + +def create_container_from_config(data: dict, log: Log) -> Container | None: + """Create a container object.""" + log.log_info("Creating container...") + log.log_debug(f"Container config is:\n{data}") + ct: Container | None = None + try: + ct = Container(data) + except ConfigError as e: + log.log_error(f"{e}") + return None + return ct + + +def main() -> None: + """Run the program.""" + argc_threshold = 2 + if len(sys.argv) < argc_threshold: + logger = Log("/dev/stdout") + logger.log_error("No arguments passed!") + sys.exit(1) + config_file = "" + log_file = "" + if len(sys.argv) >= argc_threshold: + config_file = sys.argv[1] + if len(sys.argv) > argc_threshold: + log_file = sys.argv[2] + logger = Log(log_file) + data = load_container_config(Path(config_file), logger) + if data is None: + logger.log_error(f"{config_file} is invalid, aborting!") + sys.exit(1) + try: + ct = create_container_from_config(data, logger) + except Exception as e: # noqa: BLE001 + logger.log_error(e) + sys.exit(1) + print(ct) + print(vars(ct)) + + +if __name__ == "__main__": + main() diff --git a/generate/log.py b/generate/log.py new file mode 100644 index 0000000..f343131 --- /dev/null +++ b/generate/log.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +"""Implement a simple logging framework.""" + +import datetime +from pathlib import Path +from typing import TypeAlias + +UNKNOWN = -1 +ERROR = 0 +WARNING = 1 +INFO = 2 +DEBUG = 3 + +LogLevel: TypeAlias = int + + +class Log: + """Class for Logging.""" + + level: LogLevel = ERROR + messages: list = [] + logfile: Path + + def __init__(self, path: str) -> None: + """Init for Log.""" + if path == "": + path = "/dev/stdout" + self.logfile = Path(path) + + def log_error(self, msg: str) -> None: + """Log an error.""" + if self.level >= ERROR: + now = self.timestamp() + prefix = "EE" + log_message = f"[{now}] ({prefix}) {msg}" + self.write_message(log_message) + + def log_warning(self, msg: str) -> None: + """Log a warning.""" + if self.level >= WARNING: + now = self.timestamp() + prefix = "WW" + log_message = f"[{now}] ({prefix}) {msg}" + self.write_message(log_message) + + def log_info(self, msg: str) -> None: + """Log an information.""" + if self.level >= INFO: + now = self.timestamp() + prefix = "II" + log_message = f"[{now}] ({prefix}) {msg}" + self.write_message(log_message) + + def log_debug(self, msg: str) -> None: + """Log a debug message.""" + if self.level >= DEBUG: + now = self.timestamp() + prefix = "DD" + log_message = f"[{now}] ({prefix}) {msg}" + self.write_message(log_message) + + def write_message(self, msg: str) -> None: + """Write the message.""" + print(msg) + if self.logfile == Path("/dev/stdout"): + return + with self.logfile.open("a+") as f: + f.write(msg) + + def timestamp(self) -> str: + """Log timestamp.""" + return datetime.datetime.now(tz=datetime.UTC).strftime( + "%Y-%m-%d %H:%M:%S", + )