1
0
Fork 0
containerctl/generate/container.py
Enno Tensing 3f5ccffc4a
generate: container: Add {create,remove}_environment wrappers
Add {create,remove}_environment wrappers to the Container class, so that
the generate script can access them.

Signed-off-by: Enno Tensing <tenno@suij.in>
2025-07-22 13:17:35 +02:00

555 lines
17 KiB
Python

#!/usr/bin/env python3
"""Util module holding Container and related objects."""
from dataclasses import dataclass
from pathlib import Path
from typing import Self, TypeAlias
from log import Log
ConfigValue: TypeAlias = str | dict | list | bool | None
class ConfigError(Exception):
"""An invalid config."""
message: str
def __init__(self, message: str = "") -> None:
"""Create Exception object."""
self.message = message
self.super.__init__(message)
def __str__(self) -> str:
"""Convert Exception object to a string."""
return f"Configuration error: {self.message}"
def join(data: list, separator: str = ",") -> str:
"""Join a list together."""
ret: str = ""
x = data.copy()
if len(x) == 0:
return ret
while len(x) > 1:
ret += x.pop() + separator
ret += x.pop()
return ret
def maybe(json: dict, key: str) -> str | dict | list | bool | None:
"""Maybe get a value."""
try:
return json[key]
except KeyError:
return None
def command_join(items: list | None) -> str:
"""Like join(), but call command()."""
joined = ""
if items is not None:
for item in items:
joined += " " + item.command()
return joined
def trim(s: str) -> str:
"""Remove sequential whitespace."""
s = s.replace("\t ", "\t")
s = s.replace("\t\\\n", " ")
while s.__contains__(" "):
s = s.replace(" ", " ")
return s
@dataclass
class Volume:
"""Container Volume."""
name: str
path: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> list | None:
"""Create from JSON."""
if val is None:
return None
if not isinstance(val, dict):
logger.log_warning("Volume key is present, but malformed.")
return None
return [cls(key, value) for key, value in val.items()]
def command(self) -> str:
"""Option for podman container create."""
return f"--volume {self.name}:{self.path}"
def create(self) -> str:
"""Create volume, if it does not exist."""
cmd = f"# Create volume {self.name}\n"
cmd += f"if ! podman volume exists '{self.name}' 2> /dev/null\n"
cmd += "then\n"
cmd += "\tpodman volume create '{self.name}'\n"
cmd += "fi\n"
return cmd
def remove(self) -> str:
"""Remove volume if it exists."""
cmd = f"# Remove volume {self.name}\n"
cmd += f"if podman volume exists '{self.name}' 2> /dev/null\n"
cmd += "then\n"
cmd += f"\tpodman volume rm '{self.name}'\n"
cmd += "fi\n"
return cmd
@dataclass
class Secret:
"""Container Secret."""
name: str
secret_type: str
target: str
options: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> list | None:
"""Create from JSON."""
if val is None:
return None
if not isinstance(val, dict):
logger.log_warning("Secret key is present, but malformed!")
return None
secrets = []
for key in val:
if not isinstance(val[key], dict):
logger.log_warning(f"Secret {key} is malformed!")
continue
name = key
secret_type = maybe(val[key], "type")
target = maybe(val[key], "target")
options = maybe(val[key], "options")
if options is None:
options = ""
secrets.append(
cls(name, str(secret_type), str(target), str(options))
)
return secrets
def command(self) -> str:
"""Option for podman container create."""
cmd = (
f"--secret {self.name},type={self.secret_type},target={self.target}"
)
if self.target == "mount" and self.options != "":
cmd = f"{cmd},{self.options}"
return cmd
def create(self) -> str:
"""Create secret if it does not exist."""
cmd = f"# Create secret {self.name}\n"
cmd += f"if ! podman secret exists '{self.name}' 2> /deb/null\n"
cmd += "then\n"
cmd += "\thead /dev/urandom \\\n"
cmd += "\t\t| tr -dc A-Za-z0-9 \\\n"
cmd += "\t\t| head -c 128 \\\n"
cmd += f"\t\t| podman secret create '{self.name}' -\n"
cmd += "fi\n"
return cmd
def remove(self) -> str:
"""Remove secret if it exists."""
cmd = f"# Remove secret {self.name}\n"
cmd += f"if podman secret exists '{self.name}' 2> /dev/null\n"
cmd += "then\n"
cmd += f"\tpodman secret rm '{self.name}'\n"
cmd += "fi\n"
return cmd
@dataclass
class Environment:
"""Container environment."""
variables: list
file: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
"""Create from JSON."""
if val is None:
return cls([], "")
if not isinstance(val, dict):
logger.log_warning("Environment key is present, but malformed!")
return cls([], "")
return cls([f"{key}='{value}'" for key, value in val.items()], "")
def command(self) -> str:
"""Option for podman container create."""
return f"--env-file={self.file}"
def create(self) -> str:
"""Create env file."""
cmd = f"# Create env-file {self.file}\n"
for var in self.variables:
escaped_var = var.replace("'", "%b")
cmd += f"printf '{escaped_var}\\n' \"'\" >> '{self.file}'\n"
return cmd
def remove(self) -> str:
"""Remove env file."""
cmd = f"# Remove env-file {self.file}\n"
cmd += f"if [ -e '{self.file}' ]\n"
cmd += "then\n"
cmd += "\trm '{self.file}'\n"
cmd += "fi\n"
return cmd
@dataclass
class Ports:
"""Container Ports."""
tcp_ports: list
udp_ports: list
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
"""Create from JSON."""
if val is None:
return cls([], [])
if not isinstance(val, dict):
logger.log_warning("Ports key is present, but malformed!")
return cls([], [])
tcp_ports = maybe(val, "tcp")
udp_ports = maybe(val, "udp")
if not isinstance(tcp_ports, list):
logger.log_warning("Key tcp_ports is not an array!")
return cls([], [])
if not isinstance(udp_ports, list):
logger.log_warning("Key udp_ports is not an array!")
return cls([], [])
return cls(tcp_ports, udp_ports)
def command(self) -> str:
"""Option for podman container create."""
ports = ""
for port in self.tcp_ports:
ports += f"\t--port {port}/tcp \\\n"
for port in self.udp_ports:
ports += f"\t--port {port}/udp \\\n"
return ports
@dataclass
class Network:
"""Container Network."""
mode: str
options: list
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
"""Create from JSON."""
if val is None or not isinstance(val, dict):
logger.log_error("Network configuration is missing or malformed!")
return cls("", [])
mode = maybe(val, "mode")
options = maybe(val, "options")
if mode is None or options is None or not isinstance(options, list):
err = "Network configuration is missing or has malformed elements!"
logger.log_error(err)
return cls("", [])
return cls(str(mode), options)
def command(self) -> str:
"""Option for podman container create."""
if self.mode == "":
return ""
cmd = f"--network={self.mode}"
opts = join(self.options)
if opts != "":
cmd += f":{opts}"
return cmd
@dataclass
class Image:
"""Container Image."""
registry: str
image: str
tag: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self | None:
"""Create from JSON."""
if val is None or not isinstance(val, dict):
logger.log_error("Image key either not present or malformed!")
return None
registry = maybe(val, "registry")
image = maybe(val, "image")
tag = maybe(val, "tag")
return cls(str(registry), str(image), str(tag))
def command(self) -> str:
"""Option for podman container create."""
return f"{self.registry}/{self.image}:{self.tag}"
@dataclass
class Capability:
"""Container Capability."""
cap: str
mode: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> list | None:
"""Create from JSON."""
if val is None or not isinstance(val, dict):
logger.log_warning(
"Capabilities key is either missing or malformed!"
)
return None
add = [cls(value, "add") for value in val["add"]]
drop = [cls(value, "drop") for value in val["drop"]]
return add + drop
def command(self) -> str:
"""Option for podman container create."""
return f"--cap-{self.mode}={self.cap}"
@dataclass
class Dns:
"""Container DNS."""
servers: list
search: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
"""Create from JSON."""
if val is None or not isinstance(val, dict):
logger.log_error("DNS Key is either missing or malformed!")
return cls([], "")
search = maybe(val, "search")
servers = maybe(val, "servers")
if not isinstance(servers, list):
logger.log_error("Servers key is not an array!")
return cls([], "")
return cls(servers, str(search))
def command(self) -> str:
"""Option for podman container create."""
if len(self.servers) == 0 and self.serach == "":
return ""
if len(self.servers) == 0:
return f"--dns-search={self.search}"
if self.search == "":
return f"--dns={join(self.servers)}"
cmd = f"--dns-search={self.search} \\\n\t--dns="
cmd += join(self.servers)
return cmd
class Container:
"""Container."""
name: str
image: Image
privileged: bool
read_only: bool
replace: bool
restart: str
pull_policy: str
network: Network
dns: Dns
ports: Ports
env: Environment | None
secrets: list | None
volumes: list | None
capabilities: list | None
def __init__(self, json: dict, logger: Log | None = None) -> None:
"""Create from JSON."""
if logger is None:
logger = Log("/dev/stdout")
name = maybe(json, "name")
if name is None:
logger.log_error("No container name set, aborting!")
return
image = maybe(json, "image")
if image is None:
logger.log_error("No image set, aborting!")
return
privileged = maybe(json, "privileged")
read_only = maybe(json, "read_only")
replace = maybe(json, "replace")
pull_policy = maybe(json, "pull_policy")
if pull_policy is None:
pull_policy = "always"
restart = maybe(json, "restart")
if restart is None:
restart = "no"
network = maybe(json, "network")
dns = maybe(json, "dns")
ports = maybe(json, "ports")
env = maybe(json, "env")
secrets = maybe(json, "secrets")
volumes = maybe(json, "volumes")
capabilities = maybe(json, "capabilities")
self.name = str(name)
self.image = Image.from_json(image, logger)
self.privileged = privileged is not None and bool(privileged)
self.read_only = read_only is not None and bool(read_only)
self.replace = replace is not None and bool(replace)
self.pull_policy = str(pull_policy)
self.restart = str(restart)
self.network = Network.from_json(network, logger)
self.dns = Dns.from_json(dns, logger)
self.ports = Ports.from_json(ports, logger)
self.env = Environment.from_json(env, logger)
self.env.file = "/var/lib/containerctl/containers/"
self.env.file += f"{self.name}/environment"
self.secrets = Secret.from_json(secrets, logger)
self.volumes = Volume.from_json(volumes, logger)
self.capabilities = Capability.from_json(capabilities, logger)
def create(self) -> str:
"""Generate podman container create command."""
cmd = f"# Create container {self.name}"
cmd += "podman container crate \\\n"
cmd += f"\t--name={self.name} \\\n"
if self.privileged:
cmd += "\t--privileged \\\n"
if self.replace:
cmd += "\t--replace \\\n"
if self.read_only:
cmd += "\t--read-only \\\n"
cmd += f"\t--restart={self.restart} \\\n"
cmd += f"\t--pull={self.pull_policy} \\\n"
cmd += f"\t{self.network.command()} \\\n"
cmd += f"\t{self.dns.command()} \\\n"
cmd += f"{self.ports.command()}"
if self.env is not None:
cmd += f"\t{self.env.command()} \\\n"
for secret in self.secrets:
cmd += f"\t{secret.command()} \\\n"
for volume in self.volumes:
cmd += f"\t{volume.command()} \\\n"
for capability in self.capabilities:
cmd += f"\t{capability.command()} \\\n"
cmd += f"\t{self.image.command()}\n"
return cmd
def create_volumes(self) -> str:
"""Generate podman volume create commands."""
if self.volumes is None:
return ""
volumes = ""
for volume in self.volumes:
volumes += volume.create()
return volumes
def create_secrets(self) -> str:
"""Generate podman secret create commands."""
if self.secrets is None:
return ""
secrets = ""
for secret in self.secrets:
secrets += secret.create()
return secrets
def create_environment(self) -> str:
"""Wrap self.env.create()."""
if self.env is None:
return ""
return self.env.create()
def create_container(self) -> str:
"""Create podman container."""
cmd = ""
if self.env is not None:
cmd += self.env.create()
cmd += self.create()
return cmd
def start_container(self) -> str:
"""Generate podman container start command."""
cmd = f"# Start container {self.name}\n"
cmd += f"podman container start {self.name}\n"
return cmd
def stop_container(self) -> str:
"""Generate podman container stop command."""
cmd = f"# Stop container {self.name}\n"
cmd += f"podman container stop {self.name}\n"
return cmd
def restart_container(self) -> str:
"""Generate podman container restart composite command."""
cmd = f"# Restart container {self.name}\n"
cmd += self.stop_container()
cmd += "sleep 5s\n"
cmd += self.start_container()
return cmd
def upgrade_container(self) -> str:
"""Generate podman container upgrade composite command."""
comment = f"# Upgrade container {self.name}\n"
remove_container = self.remove_container()
remove_env_file = self.env.remove()
create_container = self.create_container()
start_container = self.start_container()
remove = f"{remove_container}\n{remove_env_file}\n"
create_and_start = create_container + "\n" + start_container
return f"{comment}\n{remove}\n{create_and_start}"
def purge_container(self) -> str:
"""Generate podman container purge composite command."""
remove_container = self.remove_container()
remove_volumes = self.remove_volumes()
remove_secrets = self.remove_secrets()
remove_env_file = ""
if self.env is not None:
remove_env_file = self.env.remove()
remove_data = f"{remove_volumes}\n{remove_secrets}\n{remove_env_file}"
return f"{remove_container}\n{remove_data}"
def remove_container(self) -> str:
"""Generate podman container remove command."""
cmd = self.stop_container()
cmd += f"\n# Remove container {self.name}\n"
cmd += f"podman container rm {self.name}\n"
return cmd
def remove_volumes(self) -> str:
"""Generate podman volume remove commands."""
if self.volumes is None:
return ""
volumes = ""
for volume in self.volumes:
volumes += volume.remove()
return volumes
def remove_secrets(self) -> str:
"""Generate podman secret remove commands."""
if self.secrets is None:
return ""
secrets = ""
for secret in self.secrets:
secrets += secret.remove()
return secrets
def remove_environment(self) -> str:
"""Wrap self.env.remove()."""
if self.env is None:
return ""
return self.env.remove()