546 lines
17 KiB
Python
546 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""Util module holding Container and related objects."""
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Self, TypeAlias
|
|
|
|
from log import Log
|
|
|
|
ConfigValue: TypeAlias = str | dict | list | bool | None
|
|
|
|
|
|
class ConfigError(Exception):
|
|
"""An invalid config."""
|
|
|
|
message: str
|
|
|
|
def __init__(self, message: str = "") -> None:
|
|
"""Create Exception object."""
|
|
self.message = message
|
|
self.super.__init__(message)
|
|
|
|
def __str__(self) -> str:
|
|
"""Convert Exception object to a string."""
|
|
return f"Configuration error: {self.message}"
|
|
|
|
|
|
def join(data: list, separator: str = ",") -> str:
|
|
"""Join a list together."""
|
|
ret: str = ""
|
|
x = data.copy()
|
|
if len(x) == 0:
|
|
return ret
|
|
while len(x) > 1:
|
|
ret += x.pop() + separator
|
|
ret += x.pop()
|
|
return ret
|
|
|
|
|
|
def maybe(json: dict, key: str) -> str | dict | list | bool | None:
|
|
"""Maybe get a value."""
|
|
try:
|
|
return json[key]
|
|
except KeyError:
|
|
return None
|
|
|
|
|
|
def command_join(items: list | None) -> str:
|
|
"""Like join(), but call command()."""
|
|
joined = ""
|
|
if items is not None:
|
|
for item in items:
|
|
joined += " " + item.command()
|
|
return joined
|
|
|
|
|
|
def trim(s: str) -> str:
|
|
"""Remove sequential whitespace."""
|
|
s = s.replace("\t ", "\t")
|
|
s = s.replace("\t\\\n", " ")
|
|
while s.__contains__(" "):
|
|
s = s.replace(" ", " ")
|
|
return s
|
|
|
|
|
|
@dataclass
|
|
class Volume:
|
|
"""Container Volume."""
|
|
|
|
name: str
|
|
path: str
|
|
|
|
@classmethod
|
|
def from_json(cls, val: ConfigValue, logger: Log) -> list | None:
|
|
"""Create from JSON."""
|
|
if val is None:
|
|
return None
|
|
if not isinstance(val, dict):
|
|
logger.log_warning("Volume key is present, but malformed.")
|
|
return None
|
|
return [cls(key, value) for key, value in val.items()]
|
|
|
|
def command(self) -> str:
|
|
"""Option for podman container create."""
|
|
return f"--volume {self.name}:{self.path}"
|
|
|
|
def create(self) -> str:
|
|
"""Create volume, if it does not exist."""
|
|
cmd = f"# Create volume {self.name}\n"
|
|
cmd += f"if ! podman volume exists '{self.name}' 2> /dev/null\n"
|
|
cmd += "then\n"
|
|
cmd += "\tpodman volume create '{self.name}'\n"
|
|
cmd += "fi\n"
|
|
return cmd
|
|
|
|
def remove(self) -> str:
|
|
"""Remove volume if it exists."""
|
|
cmd = f"# Remove volume {self.name}\n"
|
|
cmd += f"if podman volume exists '{self.name}' 2> /dev/null\n"
|
|
cmd += "then\n"
|
|
cmd += f"\tpodman volume rm '{self.name}'\n"
|
|
cmd += "fi\n"
|
|
return cmd
|
|
|
|
|
|
@dataclass
|
|
class Secret:
|
|
"""Container Secret."""
|
|
|
|
name: str
|
|
secret_type: str
|
|
target: str
|
|
options: str
|
|
|
|
@classmethod
|
|
def from_json(cls, val: ConfigValue, logger: Log) -> list | None:
|
|
"""Create from JSON."""
|
|
if val is None:
|
|
return None
|
|
if not isinstance(val, dict):
|
|
logger.log_warning("Secret key is present, but malformed!")
|
|
return None
|
|
secrets = []
|
|
for key in val:
|
|
if not isinstance(val[key], dict):
|
|
logger.log_warning(f"Secret {key} is malformed!")
|
|
continue
|
|
name = key
|
|
secret_type = maybe(val[key], "type")
|
|
target = maybe(val[key], "target")
|
|
options = maybe(val[key], "options")
|
|
if options is None:
|
|
options = ""
|
|
secrets.append(
|
|
cls(name, str(secret_type), str(target), str(options))
|
|
)
|
|
|
|
return secrets
|
|
|
|
def command(self) -> str:
|
|
"""Option for podman container create."""
|
|
cmd = (
|
|
f"--secret {self.name},type={self.secret_type},target={self.target}"
|
|
)
|
|
if self.target == "mount" and self.options != "":
|
|
cmd = f"{cmd},{self.options}"
|
|
|
|
return cmd
|
|
|
|
def create(self) -> str:
|
|
"""Create secret if it does not exist."""
|
|
cmd = f"# Create secret {self.name}\n"
|
|
cmd += f"if ! podman secret exists '{self.name}' 2> /deb/null\n"
|
|
cmd += "then\n"
|
|
cmd += "\thead /dev/urandom \\\n"
|
|
cmd += "\t\t| tr -dc A-Za-z0-9 \\\n"
|
|
cmd += "\t\t| head -c 128 \\\n"
|
|
cmd += f"\t\t| podman secret create '{self.name}' -\n"
|
|
cmd += "fi\n"
|
|
return cmd
|
|
|
|
def remove(self) -> str:
|
|
"""Remove secret if it exists."""
|
|
cmd = f"# Remove secret {self.name}\n"
|
|
cmd += f"if podman secret exists '{self.name}' 2> /dev/null\n"
|
|
cmd += "then\n"
|
|
cmd += f"\tpodman secret rm '{self.name}'\n"
|
|
cmd += "fi\n"
|
|
return cmd
|
|
|
|
|
|
@dataclass
|
|
class Environment:
|
|
"""Container environment."""
|
|
|
|
variables: list
|
|
file: str
|
|
|
|
@classmethod
|
|
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
|
|
"""Create from JSON."""
|
|
if val is None:
|
|
return cls([], "")
|
|
if not isinstance(val, dict):
|
|
logger.log_warning("Environment key is present, but malformed!")
|
|
return cls([], "")
|
|
return cls([f"{key}='{value}'" for key, value in val.items()], "")
|
|
|
|
def command(self) -> str:
|
|
"""Option for podman container create."""
|
|
return f"--env-file={self.file}"
|
|
|
|
def create(self) -> str:
|
|
"""Create env file."""
|
|
cmd = f"# Create env-file {self.file}\n"
|
|
for var in self.variables:
|
|
escaped_var = var.replace("'", "%b")
|
|
cmd += f"printf '{escaped_var}\\n' \"'\" >> '{self.file}'\n"
|
|
|
|
return cmd
|
|
|
|
def remove(self) -> str:
|
|
"""Remove env file."""
|
|
cmd = f"# Remove env-file {self.file}\n"
|
|
cmd += f"if [ -e '{self.file}' ]\n"
|
|
cmd += "then\n"
|
|
cmd += "\trm '{self.file}'\n"
|
|
cmd += "fi\n"
|
|
return cmd
|
|
|
|
|
|
@dataclass
|
|
class Ports:
|
|
"""Container Ports."""
|
|
|
|
tcp_ports: list
|
|
udp_ports: list
|
|
|
|
@classmethod
|
|
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
|
|
"""Create from JSON."""
|
|
if val is None:
|
|
return cls([], [])
|
|
if not isinstance(val, dict):
|
|
logger.log_warning("Ports key is present, but malformed!")
|
|
return cls([], [])
|
|
tcp_ports = maybe(val, "tcp")
|
|
udp_ports = maybe(val, "udp")
|
|
if not isinstance(tcp_ports, list):
|
|
logger.log_warning("Key tcp_ports is not an array!")
|
|
return cls([], [])
|
|
if not isinstance(udp_ports, list):
|
|
logger.log_warning("Key udp_ports is not an array!")
|
|
return cls([], [])
|
|
return cls(tcp_ports, udp_ports)
|
|
|
|
def command(self) -> str:
|
|
"""Option for podman container create."""
|
|
ports = ""
|
|
for port in self.tcp_ports:
|
|
ports += f"\t--port {port}/tcp \\\n"
|
|
for port in self.udp_ports:
|
|
ports += f"\t--port {port}/udp \\\n"
|
|
return ports
|
|
|
|
|
|
@dataclass
|
|
class Network:
|
|
"""Container Network."""
|
|
|
|
mode: str
|
|
options: list
|
|
|
|
@classmethod
|
|
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
|
|
"""Create from JSON."""
|
|
if val is None or not isinstance(val, dict):
|
|
logger.log_error("Network configuration is missing or malformed!")
|
|
return cls("", [])
|
|
mode = maybe(val, "mode")
|
|
options = maybe(val, "options")
|
|
if mode is None or options is None or not isinstance(options, list):
|
|
err = "Network configuration is missing or has malformed elements!"
|
|
logger.log_error(err)
|
|
return cls("", [])
|
|
return cls(str(mode), options)
|
|
|
|
def command(self) -> str:
|
|
"""Option for podman container create."""
|
|
if self.mode == "":
|
|
return ""
|
|
cmd = f"--network={self.mode}"
|
|
opts = join(self.options)
|
|
if opts != "":
|
|
cmd += f":{opts}"
|
|
return cmd
|
|
|
|
|
|
@dataclass
|
|
class Image:
|
|
"""Container Image."""
|
|
|
|
registry: str
|
|
image: str
|
|
tag: str
|
|
|
|
@classmethod
|
|
def from_json(cls, val: ConfigValue, logger: Log) -> Self | None:
|
|
"""Create from JSON."""
|
|
if val is None or not isinstance(val, dict):
|
|
logger.log_error("Image key either not present or malformed!")
|
|
return None
|
|
registry = maybe(val, "registry")
|
|
image = maybe(val, "image")
|
|
tag = maybe(val, "tag")
|
|
return cls(str(registry), str(image), str(tag))
|
|
|
|
def command(self) -> str:
|
|
"""Option for podman container create."""
|
|
return f"{self.registry}/{self.image}:{self.tag}"
|
|
|
|
|
|
@dataclass
|
|
class Capability:
|
|
"""Container Capability."""
|
|
|
|
cap: str
|
|
mode: str
|
|
|
|
@classmethod
|
|
def from_json(cls, val: ConfigValue, logger: Log) -> list | None:
|
|
"""Create from JSON."""
|
|
if val is None or not isinstance(val, dict):
|
|
logger.log_warning(
|
|
"Capabilities key is either missing or malformed!"
|
|
)
|
|
return None
|
|
add = [cls(value, "add") for value in val["add"]]
|
|
drop = [cls(value, "drop") for value in val["drop"]]
|
|
return add + drop
|
|
|
|
def command(self) -> str:
|
|
"""Option for podman container create."""
|
|
return f"--cap-{self.mode}={self.cap}"
|
|
|
|
|
|
@dataclass
|
|
class Dns:
|
|
"""Container DNS."""
|
|
|
|
servers: list
|
|
search: str
|
|
|
|
@classmethod
|
|
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
|
|
"""Create from JSON."""
|
|
if val is None or not isinstance(val, dict):
|
|
logger.log_error("DNS Key is either missing or malformed!")
|
|
return cls([], "")
|
|
search = maybe(val, "search")
|
|
servers = maybe(val, "servers")
|
|
if not isinstance(servers, list):
|
|
logger.log_error("Servers key is not an array!")
|
|
return cls([], "")
|
|
return cls(servers, str(search))
|
|
|
|
def command(self) -> str:
|
|
"""Option for podman container create."""
|
|
if len(self.servers) == 0 and self.search == "":
|
|
return ""
|
|
if len(self.servers) == 0:
|
|
return f"--dns-search={self.search}"
|
|
if self.search == "":
|
|
return f"--dns={join(self.servers)}"
|
|
|
|
cmd = f"--dns-search={self.search} \\\n\t--dns="
|
|
cmd += join(self.servers)
|
|
|
|
return cmd
|
|
|
|
|
|
class Container:
|
|
"""Container."""
|
|
|
|
name: str
|
|
image: Image
|
|
privileged: bool
|
|
read_only: bool
|
|
replace: bool
|
|
restart: str
|
|
pull_policy: str
|
|
network: Network
|
|
dns: Dns
|
|
ports: Ports
|
|
env: Environment | None
|
|
secrets: list | None
|
|
volumes: list | None
|
|
capabilities: list | None
|
|
|
|
def __init__(self, json: dict, logger: Log | None = None) -> None:
|
|
"""Create from JSON."""
|
|
if logger is None:
|
|
logger = Log("/dev/stdout")
|
|
name = maybe(json, "name")
|
|
if name is None:
|
|
logger.log_error("No container name set, aborting!")
|
|
return
|
|
image = maybe(json, "image")
|
|
if image is None:
|
|
logger.log_error("No image set, aborting!")
|
|
return
|
|
privileged = maybe(json, "privileged")
|
|
read_only = maybe(json, "read_only")
|
|
replace = maybe(json, "replace")
|
|
pull_policy = maybe(json, "pull_policy")
|
|
if pull_policy is None:
|
|
pull_policy = "always"
|
|
restart = maybe(json, "restart")
|
|
if restart is None:
|
|
restart = "no"
|
|
network = maybe(json, "network")
|
|
dns = maybe(json, "dns")
|
|
ports = maybe(json, "ports")
|
|
env = maybe(json, "env")
|
|
secrets = maybe(json, "secrets")
|
|
volumes = maybe(json, "volumes")
|
|
capabilities = maybe(json, "capabilities")
|
|
self.name = str(name)
|
|
self.image = Image.from_json(image, logger)
|
|
self.privileged = privileged is not None and bool(privileged)
|
|
self.read_only = read_only is not None and bool(read_only)
|
|
self.replace = replace is not None and bool(replace)
|
|
self.pull_policy = str(pull_policy)
|
|
self.restart = str(restart)
|
|
self.network = Network.from_json(network, logger)
|
|
self.dns = Dns.from_json(dns, logger)
|
|
self.ports = Ports.from_json(ports, logger)
|
|
self.env = Environment.from_json(env, logger)
|
|
self.env.file = "/var/lib/containerctl/containers/"
|
|
self.env.file += f"{self.name}/environment"
|
|
self.secrets = Secret.from_json(secrets, logger)
|
|
self.volumes = Volume.from_json(volumes, logger)
|
|
self.capabilities = Capability.from_json(capabilities, logger)
|
|
|
|
def create_volumes(self) -> str:
|
|
"""Generate podman volume create commands."""
|
|
if self.volumes is None:
|
|
return ""
|
|
volumes = ""
|
|
for volume in self.volumes:
|
|
volumes += volume.create()
|
|
return volumes
|
|
|
|
def create_secrets(self) -> str:
|
|
"""Generate podman secret create commands."""
|
|
if self.secrets is None:
|
|
return ""
|
|
secrets = ""
|
|
for secret in self.secrets:
|
|
secrets += secret.create()
|
|
return secrets
|
|
|
|
def create_environment(self) -> str:
|
|
"""Wrap self.env.create()."""
|
|
if self.env is None:
|
|
return ""
|
|
return self.env.create()
|
|
|
|
def create_container(self) -> str:
|
|
"""Generate podman container create command."""
|
|
cmd = f"# Create container {self.name}"
|
|
cmd += "podman container crate \\\n"
|
|
cmd += f"\t--name={self.name} \\\n"
|
|
if self.privileged:
|
|
cmd += "\t--privileged \\\n"
|
|
if self.replace:
|
|
cmd += "\t--replace \\\n"
|
|
if self.read_only:
|
|
cmd += "\t--read-only \\\n"
|
|
cmd += f"\t--restart={self.restart} \\\n"
|
|
cmd += f"\t--pull={self.pull_policy} \\\n"
|
|
cmd += f"\t{self.network.command()} \\\n"
|
|
cmd += f"\t{self.dns.command()} \\\n"
|
|
cmd += f"{self.ports.command()}"
|
|
if self.env is not None:
|
|
cmd += f"\t{self.env.command()} \\\n"
|
|
for secret in self.secrets:
|
|
cmd += f"\t{secret.command()} \\\n"
|
|
for volume in self.volumes:
|
|
cmd += f"\t{volume.command()} \\\n"
|
|
for capability in self.capabilities:
|
|
cmd += f"\t{capability.command()} \\\n"
|
|
cmd += f"\t{self.image.command()}\n"
|
|
return cmd
|
|
|
|
def start_container(self) -> str:
|
|
"""Generate podman container start command."""
|
|
cmd = f"# Start container {self.name}\n"
|
|
cmd += f"podman container start {self.name}\n"
|
|
return cmd
|
|
|
|
def stop_container(self) -> str:
|
|
"""Generate podman container stop command."""
|
|
cmd = f"# Stop container {self.name}\n"
|
|
cmd += f"podman container stop {self.name}\n"
|
|
return cmd
|
|
|
|
def restart_container(self) -> str:
|
|
"""Generate podman container restart composite command."""
|
|
cmd = f"# Restart container {self.name}\n"
|
|
cmd += self.stop_container()
|
|
cmd += "sleep 5s\n"
|
|
cmd += self.start_container()
|
|
return cmd
|
|
|
|
def upgrade_container(self) -> str:
|
|
"""Generate podman container upgrade composite command."""
|
|
comment = f"# Upgrade container {self.name}\n"
|
|
remove_container = self.remove_container()
|
|
remove_env_file = self.remove_environment()
|
|
create_container = self.create_container()
|
|
create_env_file = self.create_environment()
|
|
start_container = self.start_container()
|
|
remove = f"{remove_container}\n{remove_env_file}\n"
|
|
create = f"{create_env_file}\n{create_container}\n"
|
|
create_and_start = create + "\n" + start_container
|
|
return f"{comment}\n{remove}\n{create_and_start}"
|
|
|
|
def purge_container(self) -> str:
|
|
"""Generate podman container purge composite command."""
|
|
remove_container = self.remove_container()
|
|
remove_volumes = self.remove_volumes()
|
|
remove_secrets = self.remove_secrets()
|
|
remove_env_file = self.remove_environment()
|
|
remove_data = f"{remove_volumes}\n{remove_secrets}\n{remove_env_file}"
|
|
return f"{remove_container}\n{remove_data}"
|
|
|
|
def remove_container(self) -> str:
|
|
"""Generate podman container remove command."""
|
|
cmd = self.stop_container()
|
|
cmd += f"\n# Remove container {self.name}\n"
|
|
cmd += f"podman container rm {self.name}\n"
|
|
return cmd
|
|
|
|
def remove_volumes(self) -> str:
|
|
"""Generate podman volume remove commands."""
|
|
if self.volumes is None:
|
|
return ""
|
|
volumes = ""
|
|
for volume in self.volumes:
|
|
volumes += volume.remove()
|
|
return volumes
|
|
|
|
def remove_secrets(self) -> str:
|
|
"""Generate podman secret remove commands."""
|
|
if self.secrets is None:
|
|
return ""
|
|
secrets = ""
|
|
for secret in self.secrets:
|
|
secrets += secret.remove()
|
|
return secrets
|
|
|
|
def remove_environment(self) -> str:
|
|
"""Wrap self.env.remove()."""
|
|
if self.env is None:
|
|
return ""
|
|
return self.env.remove()
|