When creating the env-vars printf is called with arguments in single quotes to prevent variable and sub-shell expansions. However, only one single quote was passed as an argument to printf, but two format specifiers are present in the escpaed string. Fix this by adding another \"'\" to the variable cmd. Signed-off-by: Enno Tensing <tenno@suij.in>
536 lines
17 KiB
Python
536 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# SPDX-FileCopyrightText: (C) 2025 Enno Tensing <tenno+containerctl@suij.in>
|
|
#
|
|
# SPDX-License-Identifier: GPL-2.0-only
|
|
|
|
"""Util module holding Container and related objects."""
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Self, TypeAlias
|
|
|
|
from log import Log
|
|
|
|
ConfigValue: TypeAlias = str | dict | list | bool | None
|
|
|
|
|
|
class ConfigError(Exception):
|
|
"""An invalid config."""
|
|
|
|
message: str
|
|
|
|
def __init__(self, message: str = "") -> None:
|
|
"""Create Exception object."""
|
|
self.message = message
|
|
super().__init__(message)
|
|
|
|
def __str__(self) -> str:
|
|
"""Convert Exception object to a string."""
|
|
return f"Configuration error: {self.message}"
|
|
|
|
|
|
def maybe(json: dict, key: str) -> str | dict | list | bool | None:
|
|
"""Maybe get a value."""
|
|
try:
|
|
return json[key]
|
|
except KeyError:
|
|
return None
|
|
|
|
|
|
def trim(s: str) -> str:
|
|
"""Remove sequential whitespace."""
|
|
s = s.replace("\t ", "\t")
|
|
s = s.replace("\t\\\n", " ")
|
|
while " " in s:
|
|
s = s.replace(" ", " ")
|
|
return s
|
|
|
|
|
|
@dataclass
|
|
class Volume:
|
|
"""Container Volume."""
|
|
|
|
name: str
|
|
path: str
|
|
|
|
@classmethod
|
|
def from_json(cls, val: ConfigValue, logger: Log) -> list | None:
|
|
"""Create from JSON."""
|
|
if val is None:
|
|
return None
|
|
if not isinstance(val, dict):
|
|
logger.log_warning("Volume key is present, but malformed.")
|
|
return None
|
|
return [cls(key, value) for key, value in val.items()]
|
|
|
|
def command(self) -> str:
|
|
"""Option for podman container create."""
|
|
return f"--volume {self.name}:{self.path}"
|
|
|
|
def create(self) -> str:
|
|
"""Create volume, if it does not exist."""
|
|
cmd = f"# Create volume {self.name}\n"
|
|
cmd += f"if ! podman volume exists '{self.name}' 2> /dev/null\n"
|
|
cmd += "then\n"
|
|
cmd += "\tpodman volume create '{self.name}'\n"
|
|
cmd += "fi\n"
|
|
return cmd
|
|
|
|
def remove(self) -> str:
|
|
"""Remove volume if it exists."""
|
|
cmd = f"# Remove volume {self.name}\n"
|
|
cmd += f"if podman volume exists '{self.name}' 2> /dev/null\n"
|
|
cmd += "then\n"
|
|
cmd += f"\tpodman volume rm '{self.name}'\n"
|
|
cmd += "fi\n"
|
|
return cmd
|
|
|
|
|
|
@dataclass
|
|
class Secret:
|
|
"""Container Secret."""
|
|
|
|
name: str
|
|
secret_type: str
|
|
target: str
|
|
options: str
|
|
|
|
@classmethod
|
|
def from_json(cls, val: ConfigValue, logger: Log) -> list | None:
|
|
"""Create from JSON."""
|
|
if val is None:
|
|
return None
|
|
if not isinstance(val, dict):
|
|
logger.log_warning("Secret key is present, but malformed!")
|
|
return None
|
|
secrets = []
|
|
for key in val:
|
|
if not isinstance(val[key], dict):
|
|
logger.log_warning(f"Secret {key} is malformed!")
|
|
continue
|
|
name = key
|
|
secret_type = maybe(val[key], "type")
|
|
target = maybe(val[key], "target")
|
|
options = maybe(val[key], "options")
|
|
if options is None:
|
|
options = ""
|
|
secrets.append(
|
|
cls(name, str(secret_type), str(target), str(options))
|
|
)
|
|
|
|
return secrets
|
|
|
|
def command(self) -> str:
|
|
"""Option for podman container create."""
|
|
cmd = (
|
|
f"--secret {self.name},type={self.secret_type},target={self.target}"
|
|
)
|
|
# Not a password, ruff...
|
|
if self.secret_type == "mount" and self.options != "": # noqa: S105
|
|
cmd = f"{cmd},{self.options}"
|
|
|
|
return cmd
|
|
|
|
def create(self) -> str:
|
|
"""Create secret if it does not exist."""
|
|
cmd = f"# Create secret {self.name}\n"
|
|
cmd += f"if ! podman secret exists '{self.name}' 2> /deb/null\n"
|
|
cmd += "then\n"
|
|
cmd += "\thead /dev/urandom \\\n"
|
|
cmd += "\t\t| tr -dc A-Za-z0-9 \\\n"
|
|
cmd += "\t\t| head -c 128 \\\n"
|
|
cmd += f"\t\t| podman secret create '{self.name}' -\n"
|
|
cmd += "fi\n"
|
|
return cmd
|
|
|
|
def remove(self) -> str:
|
|
"""Remove secret if it exists."""
|
|
cmd = f"# Remove secret {self.name}\n"
|
|
cmd += f"if podman secret exists '{self.name}' 2> /dev/null\n"
|
|
cmd += "then\n"
|
|
cmd += f"\tpodman secret rm '{self.name}'\n"
|
|
cmd += "fi\n"
|
|
return cmd
|
|
|
|
|
|
@dataclass
|
|
class Environment:
|
|
"""Container environment."""
|
|
|
|
variables: list
|
|
file: str
|
|
|
|
@classmethod
|
|
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
|
|
"""Create from JSON."""
|
|
if val is None:
|
|
return cls([], "")
|
|
if not isinstance(val, dict):
|
|
logger.log_warning("Environment key is present, but malformed!")
|
|
return cls([], "")
|
|
return cls([f"{key}='{value}'" for key, value in val.items()], "")
|
|
|
|
def command(self) -> str:
|
|
"""Option for podman container create."""
|
|
return f"--env-file={self.file}"
|
|
|
|
def create(self) -> str:
|
|
"""Create env file."""
|
|
cmd = f"# Create env-file {self.file}\n"
|
|
for var in self.variables:
|
|
escaped_var = var.replace("'", "%b")
|
|
cmd += f"printf '{escaped_var}\\n' \"'\" \"'\" >> '{self.file}'\n"
|
|
|
|
return cmd
|
|
|
|
def remove(self) -> str:
|
|
"""Remove env file."""
|
|
cmd = f"# Remove env-file {self.file}\n"
|
|
cmd += f"if [ -e '{self.file}' ]\n"
|
|
cmd += "then\n"
|
|
cmd += "\trm '{self.file}'\n"
|
|
cmd += "fi\n"
|
|
return cmd
|
|
|
|
|
|
@dataclass
|
|
class Ports:
|
|
"""Container Ports."""
|
|
|
|
tcp_ports: list
|
|
udp_ports: list
|
|
|
|
@classmethod
|
|
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
|
|
"""Create from JSON."""
|
|
if val is None:
|
|
return cls([], [])
|
|
if not isinstance(val, dict):
|
|
logger.log_warning("Ports key is present, but malformed!")
|
|
return cls([], [])
|
|
tcp_ports = maybe(val, "tcp")
|
|
udp_ports = maybe(val, "udp")
|
|
if not isinstance(tcp_ports, list):
|
|
logger.log_warning("Key tcp_ports is not an array!")
|
|
return cls([], [])
|
|
if not isinstance(udp_ports, list):
|
|
logger.log_warning("Key udp_ports is not an array!")
|
|
return cls([], [])
|
|
return cls(tcp_ports, udp_ports)
|
|
|
|
def command(self) -> str:
|
|
"""Option for podman container create."""
|
|
ports = ""
|
|
seperator = " \\\n"
|
|
ports += seperator.join(
|
|
[f"\t--port {port}/tcp" for port in self.tcp_ports]
|
|
)
|
|
ports += seperator
|
|
ports += seperator.join(
|
|
[f"\t--port {port}/udp" for port in self.udp_ports]
|
|
)
|
|
ports += seperator
|
|
return ports
|
|
|
|
|
|
@dataclass
|
|
class Network:
|
|
"""Container Network."""
|
|
|
|
mode: str
|
|
options: list
|
|
|
|
@classmethod
|
|
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
|
|
"""Create from JSON."""
|
|
if val is None or not isinstance(val, dict):
|
|
logger.log_error("Network configuration is missing or malformed!")
|
|
return cls("", [])
|
|
mode = maybe(val, "mode")
|
|
options = maybe(val, "options")
|
|
if mode is None or options is None or not isinstance(options, list):
|
|
err = "Network configuration is missing or has malformed elements!"
|
|
logger.log_error(err)
|
|
return cls("", [])
|
|
return cls(str(mode), options)
|
|
|
|
def command(self) -> str:
|
|
"""Option for podman container create."""
|
|
if self.mode == "":
|
|
return ""
|
|
cmd = f"--network={self.mode}"
|
|
opts = ",".join(self.options)
|
|
if opts != "":
|
|
cmd += f":{opts}"
|
|
return cmd
|
|
|
|
|
|
@dataclass
|
|
class Image:
|
|
"""Container Image."""
|
|
|
|
registry: str
|
|
image: str
|
|
tag: str
|
|
|
|
@classmethod
|
|
def from_json(cls, val: ConfigValue, logger: Log) -> Self | None:
|
|
"""Create from JSON."""
|
|
if val is None or not isinstance(val, dict):
|
|
logger.log_error("Image key either not present or malformed!")
|
|
return None
|
|
registry = maybe(val, "registry")
|
|
image = maybe(val, "image")
|
|
tag = maybe(val, "tag")
|
|
return cls(str(registry), str(image), str(tag))
|
|
|
|
def command(self) -> str:
|
|
"""Option for podman container create."""
|
|
return f"{self.registry}/{self.image}:{self.tag}"
|
|
|
|
|
|
@dataclass
|
|
class Capability:
|
|
"""Container Capability."""
|
|
|
|
cap: str
|
|
mode: str
|
|
|
|
@classmethod
|
|
def from_json(cls, val: ConfigValue, logger: Log) -> list | None:
|
|
"""Create from JSON."""
|
|
if val is None or not isinstance(val, dict):
|
|
logger.log_warning(
|
|
"Capabilities key is either missing or malformed!"
|
|
)
|
|
return None
|
|
add = [cls(value, "add") for value in val["add"]]
|
|
drop = [cls(value, "drop") for value in val["drop"]]
|
|
return add + drop
|
|
|
|
def command(self) -> str:
|
|
"""Option for podman container create."""
|
|
return f"--cap-{self.mode}={self.cap}"
|
|
|
|
|
|
@dataclass
|
|
class Dns:
|
|
"""Container DNS."""
|
|
|
|
servers: list
|
|
search: str
|
|
|
|
@classmethod
|
|
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
|
|
"""Create from JSON."""
|
|
if val is None or not isinstance(val, dict):
|
|
logger.log_error("DNS Key is either missing or malformed!")
|
|
return cls([], "")
|
|
search = maybe(val, "search")
|
|
servers = maybe(val, "servers")
|
|
if not isinstance(servers, list):
|
|
logger.log_error("Servers key is not an array!")
|
|
return cls([], "")
|
|
return cls(servers, str(search))
|
|
|
|
def command(self) -> str:
|
|
"""Option for podman container create."""
|
|
if len(self.servers) == 0 and self.search == "":
|
|
return ""
|
|
if len(self.servers) == 0:
|
|
return f"--dns-search={self.search}"
|
|
if self.search == "":
|
|
return f"--dns={','.join(self.servers)}"
|
|
|
|
cmd = f"--dns-search={self.search} \\\n\t--dns="
|
|
cmd += ",".join(self.servers)
|
|
|
|
return cmd
|
|
|
|
|
|
class Container:
|
|
"""Container."""
|
|
|
|
name: str
|
|
image: Image
|
|
privileged: bool
|
|
read_only: bool
|
|
replace: bool
|
|
restart: str
|
|
pull_policy: str
|
|
network: Network
|
|
dns: Dns
|
|
ports: Ports
|
|
env: Environment | None
|
|
secrets: list | None
|
|
volumes: list | None
|
|
capabilities: list | None
|
|
|
|
def __init__(self, json: dict, logger: Log | None = None) -> None:
|
|
"""Create from JSON."""
|
|
if logger is None:
|
|
logger = Log("/dev/stdout")
|
|
name = maybe(json, "name")
|
|
if name is None:
|
|
logger.log_error("No container name set, aborting!")
|
|
return
|
|
image = maybe(json, "image")
|
|
if image is None:
|
|
logger.log_error("No image set, aborting!")
|
|
return
|
|
privileged = maybe(json, "privileged")
|
|
read_only = maybe(json, "read_only")
|
|
replace = maybe(json, "replace")
|
|
pull_policy = maybe(json, "pull_policy")
|
|
if pull_policy is None:
|
|
pull_policy = "always"
|
|
restart = maybe(json, "restart")
|
|
if restart is None:
|
|
restart = "no"
|
|
network = maybe(json, "network")
|
|
dns = maybe(json, "dns")
|
|
ports = maybe(json, "ports")
|
|
env = maybe(json, "env")
|
|
secrets = maybe(json, "secrets")
|
|
volumes = maybe(json, "volumes")
|
|
capabilities = maybe(json, "capabilities")
|
|
self.name = str(name)
|
|
self.image = Image.from_json(image, logger)
|
|
self.privileged = privileged is not None and bool(privileged)
|
|
self.read_only = read_only is not None and bool(read_only)
|
|
self.replace = replace is not None and bool(replace)
|
|
self.pull_policy = str(pull_policy)
|
|
self.restart = str(restart)
|
|
self.network = Network.from_json(network, logger)
|
|
self.dns = Dns.from_json(dns, logger)
|
|
self.ports = Ports.from_json(ports, logger)
|
|
self.env = Environment.from_json(env, logger)
|
|
self.env.file = "/var/lib/containerctl/containers/"
|
|
self.env.file += f"{self.name}/environment"
|
|
self.secrets = Secret.from_json(secrets, logger)
|
|
self.volumes = Volume.from_json(volumes, logger)
|
|
self.capabilities = Capability.from_json(capabilities, logger)
|
|
|
|
def create_volumes(self) -> str:
|
|
"""Generate podman volume create commands."""
|
|
if self.volumes is None:
|
|
return ""
|
|
volumes = ""
|
|
for volume in self.volumes:
|
|
volumes += volume.create()
|
|
return volumes
|
|
|
|
def create_secrets(self) -> str:
|
|
"""Generate podman secret create commands."""
|
|
if self.secrets is None:
|
|
return ""
|
|
secrets = ""
|
|
for secret in self.secrets:
|
|
secrets += secret.create()
|
|
return secrets
|
|
|
|
def create_environment(self) -> str:
|
|
"""Wrap self.env.create()."""
|
|
if self.env is None:
|
|
return ""
|
|
return self.env.create()
|
|
|
|
def create_container(self) -> str:
|
|
"""Generate podman container create command."""
|
|
cmd = f"# Create container {self.name}\n"
|
|
cmd += "podman container crate \\\n"
|
|
cmd += f"\t--name={self.name} \\\n"
|
|
if self.privileged:
|
|
cmd += "\t--privileged \\\n"
|
|
if self.replace:
|
|
cmd += "\t--replace \\\n"
|
|
if self.read_only:
|
|
cmd += "\t--read-only \\\n"
|
|
cmd += f"\t--restart={self.restart} \\\n"
|
|
cmd += f"\t--pull={self.pull_policy} \\\n"
|
|
cmd += f"\t{self.network.command()} \\\n"
|
|
cmd += f"\t{self.dns.command()} \\\n"
|
|
cmd += f"{self.ports.command()}"
|
|
if self.env is not None:
|
|
cmd += f"\t{self.env.command()} \\\n"
|
|
for secret in self.secrets:
|
|
cmd += f"\t{secret.command()} \\\n"
|
|
for volume in self.volumes:
|
|
cmd += f"\t{volume.command()} \\\n"
|
|
for capability in self.capabilities:
|
|
cmd += f"\t{capability.command()} \\\n"
|
|
cmd += f"\t{self.image.command()}\n"
|
|
return cmd
|
|
|
|
def start_container(self) -> str:
|
|
"""Generate podman container start command."""
|
|
cmd = f"# Start container {self.name}\n"
|
|
cmd += f"podman container start {self.name}\n"
|
|
return cmd
|
|
|
|
def stop_container(self) -> str:
|
|
"""Generate podman container stop command."""
|
|
cmd = f"# Stop container {self.name}\n"
|
|
cmd += f"podman container stop {self.name}\n"
|
|
return cmd
|
|
|
|
def restart_container(self) -> str:
|
|
"""Generate podman container restart composite command."""
|
|
cmd = f"# Restart container {self.name}\n"
|
|
cmd += self.stop_container()
|
|
cmd += "sleep 5s\n"
|
|
cmd += self.start_container()
|
|
return cmd
|
|
|
|
def upgrade_container(self) -> str:
|
|
"""Generate podman container upgrade composite command."""
|
|
comment = f"# Upgrade container {self.name}\n"
|
|
remove_container = self.remove_container()
|
|
remove_env_file = self.remove_environment()
|
|
create_container = self.create_container()
|
|
create_env_file = self.create_environment()
|
|
start_container = self.start_container()
|
|
remove = f"{remove_container}\n{remove_env_file}\n"
|
|
create = f"{create_env_file}\n{create_container}\n"
|
|
create_and_start = create + "\n" + start_container
|
|
return f"{comment}\n{remove}\n{create_and_start}"
|
|
|
|
def purge_container(self) -> str:
|
|
"""Generate podman container purge composite command."""
|
|
remove_container = self.remove_container()
|
|
remove_volumes = self.remove_volumes()
|
|
remove_secrets = self.remove_secrets()
|
|
remove_env_file = self.remove_environment()
|
|
remove_data = f"{remove_volumes}\n{remove_secrets}\n{remove_env_file}"
|
|
return f"{remove_container}\n{remove_data}"
|
|
|
|
def remove_container(self) -> str:
|
|
"""Generate podman container remove command."""
|
|
cmd = self.stop_container()
|
|
cmd += f"\n# Remove container {self.name}\n"
|
|
cmd += f"podman container rm {self.name}\n"
|
|
return cmd
|
|
|
|
def remove_volumes(self) -> str:
|
|
"""Generate podman volume remove commands."""
|
|
if self.volumes is None:
|
|
return ""
|
|
volumes = ""
|
|
for volume in self.volumes:
|
|
volumes += volume.remove()
|
|
return volumes
|
|
|
|
def remove_secrets(self) -> str:
|
|
"""Generate podman secret remove commands."""
|
|
if self.secrets is None:
|
|
return ""
|
|
secrets = ""
|
|
for secret in self.secrets:
|
|
secrets += secret.remove()
|
|
return secrets
|
|
|
|
def remove_environment(self) -> str:
|
|
"""Wrap self.env.remove()."""
|
|
if self.env is None:
|
|
return ""
|
|
return self.env.remove()
|