1
0
Fork 0
containerctl/generate/container.py
2025-08-09 20:19:30 +02:00

857 lines
26 KiB
Python

#!/usr/bin/env python3
# SPDX-FileCopyrightText: (C) 2025 Enno Tensing <tenno+containerctl@suij.in>
#
# SPDX-License-Identifier: GPL-2.0-only
"""Util module holding Container and related objects."""
from dataclasses import dataclass
from typing import Self, TypeAlias
from log import Log
ConfigValue: TypeAlias = str | dict | list | bool | None
class ConfigError(Exception):
"""An invalid config."""
message: str
def __init__(self, message: str = "") -> None:
"""Create Exception object."""
self.message = message
super().__init__(message)
def __str__(self) -> str:
"""Convert Exception object to a string."""
return f"Configuration error: {self.message}"
def maybe(json: dict, key: str) -> ConfigValue:
"""Maybe get a value."""
try:
return json[key]
except KeyError:
return None
def maybe_or(json: dict, key: str, _or: ConfigValue) -> ConfigValue:
"""Maybe get a value, but return _or if it is None."""
val = maybe(json, key)
if val is None or not isinstance(val, type(_or)):
return _or
return val
def trim(s: str) -> str:
"""Remove sequential whitespace."""
s = s.replace("\t ", "\t")
s = s.replace("\t\\\n", " ")
while " " in s:
s = s.replace(" ", " ")
return s
@dataclass
class Cgroup:
"""cgroup Config."""
config: list
parent: str
namespace: str
how: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
"""Create from JSON."""
if val is None:
return cls([], "", "", "")
if not isinstance(val, dict):
logger.log_warning("cgroup Config is invalid!")
return cls([], "", "", "")
config = maybe_or(val, "config", [])
parent = maybe_or(val, "parent", "")
namespace = maybe_or(val, "namespace", "")
how = maybe_or(val, "how", "")
if how == "split" and parent != "":
logger.log_warning(
"Split cgroups can not be combined with a cgroup parent!"
)
parent = ""
return cls(config, parent, namespace, how)
def command(self) -> str:
"""Option for podman container create."""
cmd = ""
seperator = " \\\n"
cmd += seperator.join(
[f"\t--cgroup-conf={option}{seperator}" for option in self.config]
)
if self.parent != "":
cmd += f"\t--cgroup-parent={self.parent}{seperator}"
if self.namespace != "":
cmd += f"\t--cgroupns={self.namespace}{seperator}"
if self.how != "":
cmd += f"\t--cgroups={self.how}{seperator}"
return cmd
@dataclass
class Cpu:
"""CPU Accounting."""
period: str
quota: str
shares: str
number: str
cpuset_cpus: str
cpuset_mems: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
"""Create from JSON."""
if val is None:
return cls("", "", "", "", "", "")
if not isinstance(val, dict):
logger.log_warning("cpu Config is invalid!")
return cls("", "", "", "", "", "")
period = maybe_or(val, "period", "")
quota = maybe_or(val, "quota", "")
shares = maybe_or(val, "shares", "")
number = maybe_or(val, "number", "")
cpuset = maybe_or(val, "cpuset", {})
cpuset_cpus = ""
cpuset_mems = ""
if len(cpuset) != 0:
cpuset_cpus += maybe_or(cpuset, "cpus", "")
cpuset_mems += maybe_or(cpuset, "mems", "")
return cls(
period,
quota,
shares,
number,
cpuset_cpus,
cpuset_mems,
)
def command(self) -> str:
"""Option for podman container create."""
cmd = ""
seperator = " \\\n"
if self.period != "":
cmd += f"\t--cpu-period={self.period}{seperator}"
if self.quota != "":
cmd += f"\t--cpu-quota={self.quota}{seperator}"
if self.shares != "":
cmd += f"\t--cpu-shares={self.shares}{seperator}"
if self.number != "":
cmd += f"\t--cpus={self.number}{seperator}"
if self.cpuset_cpus != "":
cmd += f"\t--cpuset-cpus={self.cpuset_cpus}{seperator}"
if self.cpuset_mems != "":
cmd += f"\t--cpuset-mems={self.cpuset_mems}{seperator}"
return cmd
@dataclass
class Memory:
"""Memory accounting."""
limit: str
reservation: str
swap: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
"""Create from JSON."""
if val is None:
return cls("", "", "")
if not isinstance(val, dict):
logger.log_warning("memory Config is invalid!")
return cls("", "", "")
limit = maybe_or(val, "limit", "")
reservation = maybe_or(val, "reservation", "")
swap = maybe_or(val, "swap", "")
if limit == "":
return cls("", "", "")
return cls(limit, reservation, swap)
def command(self) -> str:
"""Option for podman container create."""
if self.limit == "":
return ""
cmd = ""
seperator = " \\\n"
cmd += f"\t--memory={self.limit}{seperator}"
if self.reservation != "":
cmd += f"\t--memory-reservation={self.reservation}{seperator}"
if self.swap != "":
cmd += f"\t--memory-swap={self.swap}{seperator}"
return cmd
@dataclass
class Accounting:
"""Resource Accounting."""
cgroup: Cgroup
cpu: Cpu
memory: Memory
@classmethod
def from_json(cls, data: ConfigValue, logger: Log) -> Self:
"""Create from JSON."""
if data is None:
return cls(
Cgroup.from_json(None, logger),
Cpu.from_json(None, logger),
Memory.from_json(None, logger),
)
cgroup_data = maybe(data, "cgroup")
cpu_data = maybe(data, "cpu")
memory_data = maybe(data, "memory")
cgroup = Cgroup.from_json(cgroup_data, logger)
cpu = Cpu.from_json(cpu_data, logger)
memory = Memory.from_json(memory_data, logger)
return cls(cgroup, cpu, memory)
def command(self) -> str:
"""Options for podman container create."""
cgroup = self.cgroup.command()
cpu = self.cpu.command()
memory = self.memory.command()
return cgroup + cpu + memory
@dataclass
class Volume:
"""Container Volume."""
name: str
path: list
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> list:
"""Create from JSON."""
if val is None:
return []
if not isinstance(val, dict):
logger.log_warning("Volume key is present, but malformed.")
return []
return [
Volume.from_json_entry(key, value) for key, value in val.items()
]
@classmethod
def from_json_entry(cls, key: str, value: str | list) -> Self:
"""Create from JSON entry."""
if isinstance(value, str):
return cls(key, [value])
return cls(key, value)
def is_host_volume(self) -> bool:
"""Check if this Volume is a named or a host volume."""
return self.name.startswith("/") or self.name.startswith(".")
def command(self) -> str:
"""Option for podman container create."""
cmd = ""
for path in self.path:
cmd += f"\t--volume {self.name}:{path} \\\n"
return cmd
def create(self) -> str:
"""Create volume, if it does not exist."""
if self.is_host_volume():
# A HOST-DIR starting with a slash or dot is interpreted as a
# directory or file on the host machine. Since the script should
# not modify things outside its own tree, i.e.
# /var/lib/containerctl, ignore it.
return ""
cmd = f"# Create volume {self.name}\n"
cmd += f"if ! podman volume exists '{self.name}' 2> /dev/null\n"
cmd += "then\n"
cmd += f"\tpodman volume create '{self.name}'\n"
cmd += "fi\n"
return cmd
def remove(self) -> str:
"""Remove volume if it exists."""
if self.is_host_volume():
# As with create, don't touch the host system
return ""
cmd = f"# Remove volume {self.name}\n"
cmd += f"if podman volume exists '{self.name}' 2> /dev/null\n"
cmd += "then\n"
cmd += f"\tpodman volume rm '{self.name}'\n"
cmd += "fi\n"
return cmd
@dataclass
class Secret:
"""Container Secret."""
name: str
secret_type: str
target: list
options: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> list:
"""Create from JSON."""
if val is None:
return []
if not isinstance(val, dict):
logger.log_warning("Secret key is present, but malformed!")
return []
secrets = []
for key in val:
if not isinstance(val[key], dict):
logger.log_warning(f"Secret {key} is malformed!")
continue
name = key
secret_type = maybe_or(val[key], "type", "")
target = maybe(val[key], "target")
if isinstance(target, str):
target = [target]
if not isinstance(target, list):
target = []
options = maybe_or(val[key], "options", "")
if options is None:
options = ""
secrets.append(cls(name, str(secret_type), target, str(options)))
return secrets
def command(self) -> str:
"""Option for podman container create."""
cmd = ""
for target in self.target:
cmd += (
f"\t--secret {self.name},:"
f"type={self.secret_type},"
f"target={target}"
)
# Not a password, ruff...
has_option = self.secret_type == "mount" # noqa: S105
has_option = has_option and self.options != ""
if has_option:
cmd = f"{cmd},{self.options}"
cmd += " \\\n"
return cmd
def create(self) -> str:
"""Create secret if it does not exist."""
cmd = f"# Create secret {self.name}\n"
cmd += f"if ! podman secret exists '{self.name}' 2> /deb/null\n"
cmd += "then\n"
cmd += "\thead /dev/urandom \\\n"
cmd += "\t\t| tr -dc A-Za-z0-9 \\\n"
cmd += "\t\t| head -c 128 \\\n"
cmd += f"\t\t| podman secret create '{self.name}' -\n"
cmd += "fi\n"
return cmd
def remove(self) -> str:
"""Remove secret if it exists."""
cmd = f"# Remove secret {self.name}\n"
cmd += f"if podman secret exists '{self.name}' 2> /dev/null\n"
cmd += "then\n"
cmd += f"\tpodman secret rm '{self.name}'\n"
cmd += "fi\n"
return cmd
@dataclass
class Environment:
"""Container environment."""
variables: list
file: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
"""Create from JSON."""
if val is None:
return cls([], "")
if not isinstance(val, dict):
logger.log_warning("Environment key is present, but malformed!")
return cls([], "")
return cls([f"{key}='{value}'" for key, value in val.items()], "")
def command(self) -> str:
"""Option for podman container create."""
if len(self.variables) == 0:
return ""
return f"\t--env-file={self.file} \\\n"
def create(self) -> str:
"""Create env file."""
cmd = ""
header = f"# Create env-file {self.file}\n"
for var in self.variables:
escaped_var = var.replace("'", "%b")
cmd += f"printf '{escaped_var}\\n' \"'\" \"'\" >> '{self.file}'\n"
if cmd == "":
return ""
return header + f"printf '\\n' > {self.file}\n" + cmd
def remove(self) -> str:
"""Remove env file."""
cmd = f"# Remove env-file {self.file}\n"
cmd += f"if [ -e '{self.file}' ]\n"
cmd += "then\n"
cmd += f"\trm '{self.file}'\n"
cmd += "fi\n"
return cmd
@dataclass
class Ports:
"""Container Ports."""
tcp_ports: list
udp_ports: list
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
"""Create from JSON."""
if val is None:
return cls([], [])
if not isinstance(val, dict):
logger.log_warning("Ports key is present, but malformed!")
return cls([], [])
tcp_ports = maybe(val, "tcp")
udp_ports = maybe(val, "udp")
if tcp_ports is None:
tcp_ports = []
if udp_ports is None:
udp_ports = []
if not isinstance(tcp_ports, list) and not isinstance(udp_ports, list):
logger.log_warning("Port configuration is malformed!")
return cls([], [])
if not isinstance(tcp_ports, list):
logger.log_warning("tcp_ports configuration is malformed!")
return cls([], udp_ports)
if not isinstance(udp_ports, list):
logger.log_warning("udp_ports configuration is malformed!")
return cls(tcp_ports, [])
return cls(tcp_ports, udp_ports)
def command(self) -> str:
"""Option for podman container create."""
seperator = " \\\n"
tcp_ports = seperator.join(
[f"\t--publish {port}/tcp" for port in self.tcp_ports]
)
udp_ports = seperator.join(
[f"\t--publish {port}/udp" for port in self.udp_ports]
)
if tcp_ports == "" and udp_ports == "":
return ""
if tcp_ports == "":
return udp_ports + seperator
if udp_ports == "":
return tcp_ports + seperator
return tcp_ports + seperator + udp_ports + seperator
@dataclass
class Network:
"""Container Network."""
mode: str
options: list
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
"""Create from JSON."""
if val is None:
return cls("", [])
if not isinstance(val, dict):
logger.log_warning("Network configuration is malformed!")
return cls("", [])
mode = maybe(val, "mode")
options = maybe(val, "options")
if mode is None:
err = "Network configuration is missing or has malformed elements!"
logger.log_error(err)
return cls("", [])
if options is None or not isinstance(options, list):
return cls(str(mode), [])
return cls(str(mode), options)
def command(self) -> str:
"""Option for podman container create."""
if self.mode == "":
return ""
cmd = f"\t--network={self.mode}"
opts = ",".join(self.options)
if opts != "":
cmd += f":{opts}"
return cmd + " \\\n"
@dataclass
class Image:
"""Container Image."""
registry: str
image: str
tag: str
cmd: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self | None:
"""Create from JSON."""
if val is None or not isinstance(val, dict):
logger.log_error("Image key either not present or malformed!")
return None
registry = maybe_or(val, "registry", "")
image = maybe_or(val, "image", "")
tag = maybe_or(val, "tag", "")
cmd = maybe_or(val, "command", "")
return cls(str(registry), str(image), str(tag), cmd)
def command(self) -> str:
"""Option for podman container create."""
if self.cmd != "":
return f"{self.registry}/{self.image}:{self.tag} {self.cmd}"
return f"{self.registry}/{self.image}:{self.tag}"
@dataclass
class Capability:
"""Container Capability."""
cap: str
mode: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> list:
"""Create from JSON."""
if val is None:
return []
if not isinstance(val, dict):
logger.log_warning("Capabilities key is malformed!")
return []
add = [cls(value, "add") for value in val["add"]]
drop = [cls(value, "drop") for value in val["drop"]]
return add + drop
def command(self) -> str:
"""Option for podman container create."""
return f"--cap-{self.mode}={self.cap}"
@dataclass
class Dns:
"""Container DNS."""
servers: list
search: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
"""Create from JSON."""
if val is None:
return cls([], "")
if not isinstance(val, dict):
logger.log_warning("DNS Key is malformed!")
return cls([], "")
search = maybe_or(val, "search", "")
servers = maybe(val, "servers")
if not isinstance(servers, list):
logger.log_error("Servers key is not an array!")
return cls([], "")
return cls(servers, str(search))
def command(self) -> str:
"""Option for podman container create."""
if len(self.servers) == 0 and self.search == "":
return ""
if len(self.servers) == 0:
return f"\t--dns-search={self.search} \\\n"
if self.search == "":
return f"\t--dns={','.join(self.servers)} \\\n"
cmd = f"\t--dns-search={self.search} \\\n\t--dns="
cmd += ",".join(self.servers)
return cmd
@dataclass
class ContainerOptions:
"""Container-Meta settings."""
privileged: bool = False
read_only: bool = False
replace: bool = False
restart: str = "no"
pull_policy: str = "always"
timezone: str = "local"
is_valid: bool = False
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
"""Create from JSON."""
if val is None:
# Should not happen!
return cls()
if not isinstance(val, dict):
logger.log_error("Container config is invalid!")
return cls()
privileged = maybe_or(val, "privileged", _or=False)
read_only = maybe_or(val, "read_only", _or=False)
replace = maybe_or(val, "replace", _or=False)
restart = maybe_or(val, "restart", "no")
pull_policy = maybe_or(val, "pull_policy", "always")
timezone = maybe_or(val, "timezone", "local")
return cls(
privileged,
read_only,
replace,
restart,
pull_policy,
timezone,
is_valid=True,
)
def command(self) -> str:
"""Option for podman conainter create."""
cmd = ""
if self.privileged:
cmd += "\t--privileged \\\n"
if self.read_only:
cmd += "\t--read-only \\\n"
if self.replace:
cmd += "\t--replace \\\n"
if self.restart != "":
cmd += f"\t--restart={self.restart} \\\n"
if self.pull_policy != "":
cmd += f"\t--pull-policy={self.pull_policy} \\\n"
if self.timezone != "":
cmd += f"\t--tz={self.timezone} \\\n"
return ""
@dataclass
class ContainerNetwork:
"""Wrapper for Network, Dns and Ports."""
network: Network
dns: Dns
ports: Ports
@classmethod
def from_json(cls, json: ConfigValue, logger: Log) -> Self:
"""Create from JSON."""
network_config = maybe(json, "network")
dns_config = maybe(json, "dns")
ports_config = maybe(json, "ports")
network = Network.from_json(network_config, logger)
dns = Dns.from_json(dns_config, logger)
ports = Ports.from_json(ports_config, logger)
return cls(network, dns, ports)
def command(self) -> str:
"""Option for podman container create."""
cmd = ""
cmd += self.network.command()
cmd += self.dns.command()
cmd += self.ports.command()
return cmd
class Container:
"""Container."""
name: str
image: Image
ct_opts: ContainerOptions
ct_network: ContainerNetwork
env: Environment
secrets: list
volumes: list
capabilities: list
accounting: Accounting
def __init__(self, json: dict, logger: Log | None = None) -> None:
"""Create from JSON."""
if logger is None:
logger = Log("/dev/stdout")
name = maybe(json, "name")
if name is None:
logger.log_error("No container name set, aborting!")
return
image = maybe(json, "image")
if image is None:
logger.log_error("No image set, aborting!")
return
ct_opts = ContainerOptions.from_json(json, logger)
if not ct_opts.is_valid:
return
env = maybe(json, "env")
secrets = maybe(json, "secrets")
volumes = maybe(json, "volumes")
capabilities = maybe(json, "capabilities")
accounting = maybe(json, "accounting")
self.name = str(name)
self.image = Image.from_json(image, logger)
self.ct_opts = ct_opts
self.ct_network = ContainerNetwork.from_json(json, logger)
self.env = Environment.from_json(env, logger)
self.env.file = "/var/lib/containerctl/environment-files/"
self.env.file += f"{self.name}"
self.secrets = Secret.from_json(secrets, logger)
self.volumes = Volume.from_json(volumes, logger)
self.capabilities = Capability.from_json(capabilities, logger)
self.accounting = Accounting.from_json(accounting, logger)
def create_volumes(self) -> str:
"""Generate podman volume create commands."""
if self.volumes is None:
return ""
volumes = ""
for volume in self.volumes:
volumes += volume.create()
return volumes
def create_secrets(self) -> str:
"""Generate podman secret create commands."""
if self.secrets is None:
return ""
secrets = ""
for secret in self.secrets:
secrets += secret.create()
return secrets
def create_environment(self) -> str:
"""Wrap self.env.create()."""
if self.env is None:
return ""
return self.env.create()
def create_container(self) -> str:
"""Generate podman container create command."""
cmd = f"# Create container {self.name}\n"
cmd += "podman container create \\\n"
cmd += f"\t--name={self.name} \\\n"
cmd += f"{self.ct_opts.command()}"
cmd += f"{self.ct_network.command()}"
cmd += f"{self.env.command()}"
for secret in self.secrets:
cmd += f"{secret.command()}"
for volume in self.volumes:
cmd += f"{volume.command()}"
for capability in self.capabilities:
cmd += f"\t{capability.command()} \\\n"
cmd += f"{self.accounting.command()}"
cmd += f"\t{self.image.command()}\n"
return cmd
def start_container(self) -> str:
"""Generate podman container start command."""
cmd = f"# Start container {self.name}\n"
cmd += f"podman container start {self.name}\n"
return cmd
def stop_container(self) -> str:
"""Generate podman container stop command."""
cmd = f"# Stop container {self.name}\n"
cmd += f"podman container stop {self.name}\n"
return cmd
def restart_container(self) -> str:
"""Generate podman container restart composite command."""
cmd = f"# Restart container {self.name}\n"
cmd += self.stop_container()
cmd += "sleep 5s\n"
cmd += self.start_container()
return cmd
def upgrade_container(self) -> str:
"""Generate podman container upgrade composite command."""
comment = f"# Upgrade container {self.name}\n"
remove_container = self.remove_container()
remove_env_file = self.remove_environment()
create_container = self.create_container()
create_env_file = self.create_environment()
start_container = self.start_container()
remove = f"{remove_container}\n{remove_env_file}\n"
create = f"{create_env_file}\n{create_container}\n"
create_and_start = create + "\n" + start_container
return f"{comment}\n{remove}\n{create_and_start}"
def purge_container(self) -> str:
"""Generate podman container purge composite command."""
remove_container = self.remove_container()
remove_volumes = self.remove_volumes()
remove_secrets = self.remove_secrets()
remove_env_file = self.remove_environment()
remove_data = f"{remove_volumes}\n{remove_secrets}\n{remove_env_file}"
return f"{remove_container}\n{remove_data}"
def remove_container(self) -> str:
"""Generate podman container remove command."""
cmd = self.stop_container()
cmd += f"\n# Remove container {self.name}\n"
cmd += f"podman container rm {self.name}\n"
return cmd
def remove_volumes(self) -> str:
"""Generate podman volume remove commands."""
if self.volumes is None:
return ""
volumes = ""
for volume in self.volumes:
volumes += volume.remove()
return volumes
def remove_secrets(self) -> str:
"""Generate podman secret remove commands."""
if self.secrets is None:
return ""
secrets = ""
for secret in self.secrets:
secrets += secret.remove()
return secrets
def remove_environment(self) -> str:
"""Wrap self.env.remove()."""
if self.env is None:
return ""
return self.env.remove()