1
0
Fork 0
containerctl/generate/container.py
Enno Tensing 72062edbf5
generate: container: Fix some issues reported by pylint
This fixes various issues reported by pylint. It now only reports to
issues, that can be ignored, since they're only about too many local
variables or class attributes.

Signed-off-by: Enno Tensing <tenno@suij.in>
2025-07-22 16:15:47 +02:00

551 lines
17 KiB
Python

#!/usr/bin/env python3
"""Util module holding Container and related objects."""
from dataclasses import dataclass
from typing import Self, TypeAlias
from log import Log
ConfigValue: TypeAlias = str | dict | list | bool | None
class ConfigError(Exception):
"""An invalid config."""
message: str
def __init__(self, message: str = "") -> None:
"""Create Exception object."""
self.message = message
super().__init__(message)
def __str__(self) -> str:
"""Convert Exception object to a string."""
return f"Configuration error: {self.message}"
def join(data: list, separator: str = ",") -> str:
"""Join a list together."""
ret: str = ""
x = data.copy()
if len(x) == 0:
return ret
while len(x) > 1:
ret += x.pop() + separator
ret += x.pop()
return ret
def maybe(json: dict, key: str) -> str | dict | list | bool | None:
"""Maybe get a value."""
try:
return json[key]
except KeyError:
return None
def command_join(items: list | None) -> str:
"""Like join(), but call command()."""
joined = ""
if items is not None:
for item in items:
joined += " " + item.command()
return joined
def trim(s: str) -> str:
"""Remove sequential whitespace."""
s = s.replace("\t ", "\t")
s = s.replace("\t\\\n", " ")
while " " in s:
s = s.replace(" ", " ")
return s
@dataclass
class Volume:
"""Container Volume."""
name: str
path: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> list | None:
"""Create from JSON."""
if val is None:
return None
if not isinstance(val, dict):
logger.log_warning("Volume key is present, but malformed.")
return None
return [cls(key, value) for key, value in val.items()]
def command(self) -> str:
"""Option for podman container create."""
return f"--volume {self.name}:{self.path}"
def create(self) -> str:
"""Create volume, if it does not exist."""
cmd = f"# Create volume {self.name}\n"
cmd += f"if ! podman volume exists '{self.name}' 2> /dev/null\n"
cmd += "then\n"
cmd += "\tpodman volume create '{self.name}'\n"
cmd += "fi\n"
return cmd
def remove(self) -> str:
"""Remove volume if it exists."""
cmd = f"# Remove volume {self.name}\n"
cmd += f"if podman volume exists '{self.name}' 2> /dev/null\n"
cmd += "then\n"
cmd += f"\tpodman volume rm '{self.name}'\n"
cmd += "fi\n"
return cmd
@dataclass
class Secret:
"""Container Secret."""
name: str
secret_type: str
target: str
options: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> list | None:
"""Create from JSON."""
if val is None:
return None
if not isinstance(val, dict):
logger.log_warning("Secret key is present, but malformed!")
return None
secrets = []
for key in val:
if not isinstance(val[key], dict):
logger.log_warning(f"Secret {key} is malformed!")
continue
name = key
secret_type = maybe(val[key], "type")
target = maybe(val[key], "target")
options = maybe(val[key], "options")
if options is None:
options = ""
secrets.append(
cls(name, str(secret_type), str(target), str(options))
)
return secrets
def command(self) -> str:
"""Option for podman container create."""
cmd = (
f"--secret {self.name},type={self.secret_type},target={self.target}"
)
if self.target == "mount" and self.options != "":
cmd = f"{cmd},{self.options}"
return cmd
def create(self) -> str:
"""Create secret if it does not exist."""
cmd = f"# Create secret {self.name}\n"
cmd += f"if ! podman secret exists '{self.name}' 2> /deb/null\n"
cmd += "then\n"
cmd += "\thead /dev/urandom \\\n"
cmd += "\t\t| tr -dc A-Za-z0-9 \\\n"
cmd += "\t\t| head -c 128 \\\n"
cmd += f"\t\t| podman secret create '{self.name}' -\n"
cmd += "fi\n"
return cmd
def remove(self) -> str:
"""Remove secret if it exists."""
cmd = f"# Remove secret {self.name}\n"
cmd += f"if podman secret exists '{self.name}' 2> /dev/null\n"
cmd += "then\n"
cmd += f"\tpodman secret rm '{self.name}'\n"
cmd += "fi\n"
return cmd
@dataclass
class Environment:
"""Container environment."""
variables: list
file: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
"""Create from JSON."""
if val is None:
return cls([], "")
if not isinstance(val, dict):
logger.log_warning("Environment key is present, but malformed!")
return cls([], "")
return cls([f"{key}='{value}'" for key, value in val.items()], "")
def command(self) -> str:
"""Option for podman container create."""
return f"--env-file={self.file}"
def create(self) -> str:
"""Create env file."""
cmd = f"# Create env-file {self.file}\n"
for var in self.variables:
escaped_var = var.replace("'", "%b")
cmd += f"printf '{escaped_var}\\n' \"'\" >> '{self.file}'\n"
return cmd
def remove(self) -> str:
"""Remove env file."""
cmd = f"# Remove env-file {self.file}\n"
cmd += f"if [ -e '{self.file}' ]\n"
cmd += "then\n"
cmd += "\trm '{self.file}'\n"
cmd += "fi\n"
return cmd
@dataclass
class Ports:
"""Container Ports."""
tcp_ports: list
udp_ports: list
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
"""Create from JSON."""
if val is None:
return cls([], [])
if not isinstance(val, dict):
logger.log_warning("Ports key is present, but malformed!")
return cls([], [])
tcp_ports = maybe(val, "tcp")
udp_ports = maybe(val, "udp")
if not isinstance(tcp_ports, list):
logger.log_warning("Key tcp_ports is not an array!")
return cls([], [])
if not isinstance(udp_ports, list):
logger.log_warning("Key udp_ports is not an array!")
return cls([], [])
return cls(tcp_ports, udp_ports)
def command(self) -> str:
"""Option for podman container create."""
ports = ""
seperator = " \\\n"
ports += seperator.join(
[f"\t--port {port}/tcp" for port in self.tcp_ports]
)
ports += seperator
ports += seperator.join(
[f"\t--port {port}/udp" for port in self.udp_ports]
)
ports += seperator
return ports
@dataclass
class Network:
"""Container Network."""
mode: str
options: list
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
"""Create from JSON."""
if val is None or not isinstance(val, dict):
logger.log_error("Network configuration is missing or malformed!")
return cls("", [])
mode = maybe(val, "mode")
options = maybe(val, "options")
if mode is None or options is None or not isinstance(options, list):
err = "Network configuration is missing or has malformed elements!"
logger.log_error(err)
return cls("", [])
return cls(str(mode), options)
def command(self) -> str:
"""Option for podman container create."""
if self.mode == "":
return ""
cmd = f"--network={self.mode}"
opts = join(self.options)
if opts != "":
cmd += f":{opts}"
return cmd
@dataclass
class Image:
"""Container Image."""
registry: str
image: str
tag: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self | None:
"""Create from JSON."""
if val is None or not isinstance(val, dict):
logger.log_error("Image key either not present or malformed!")
return None
registry = maybe(val, "registry")
image = maybe(val, "image")
tag = maybe(val, "tag")
return cls(str(registry), str(image), str(tag))
def command(self) -> str:
"""Option for podman container create."""
return f"{self.registry}/{self.image}:{self.tag}"
@dataclass
class Capability:
"""Container Capability."""
cap: str
mode: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> list | None:
"""Create from JSON."""
if val is None or not isinstance(val, dict):
logger.log_warning(
"Capabilities key is either missing or malformed!"
)
return None
add = [cls(value, "add") for value in val["add"]]
drop = [cls(value, "drop") for value in val["drop"]]
return add + drop
def command(self) -> str:
"""Option for podman container create."""
return f"--cap-{self.mode}={self.cap}"
@dataclass
class Dns:
"""Container DNS."""
servers: list
search: str
@classmethod
def from_json(cls, val: ConfigValue, logger: Log) -> Self:
"""Create from JSON."""
if val is None or not isinstance(val, dict):
logger.log_error("DNS Key is either missing or malformed!")
return cls([], "")
search = maybe(val, "search")
servers = maybe(val, "servers")
if not isinstance(servers, list):
logger.log_error("Servers key is not an array!")
return cls([], "")
return cls(servers, str(search))
def command(self) -> str:
"""Option for podman container create."""
if len(self.servers) == 0 and self.search == "":
return ""
if len(self.servers) == 0:
return f"--dns-search={self.search}"
if self.search == "":
return f"--dns={join(self.servers)}"
cmd = f"--dns-search={self.search} \\\n\t--dns="
cmd += join(self.servers)
return cmd
class Container:
"""Container."""
name: str
image: Image
privileged: bool
read_only: bool
replace: bool
restart: str
pull_policy: str
network: Network
dns: Dns
ports: Ports
env: Environment | None
secrets: list | None
volumes: list | None
capabilities: list | None
def __init__(self, json: dict, logger: Log | None = None) -> None:
"""Create from JSON."""
if logger is None:
logger = Log("/dev/stdout")
name = maybe(json, "name")
if name is None:
logger.log_error("No container name set, aborting!")
return
image = maybe(json, "image")
if image is None:
logger.log_error("No image set, aborting!")
return
privileged = maybe(json, "privileged")
read_only = maybe(json, "read_only")
replace = maybe(json, "replace")
pull_policy = maybe(json, "pull_policy")
if pull_policy is None:
pull_policy = "always"
restart = maybe(json, "restart")
if restart is None:
restart = "no"
network = maybe(json, "network")
dns = maybe(json, "dns")
ports = maybe(json, "ports")
env = maybe(json, "env")
secrets = maybe(json, "secrets")
volumes = maybe(json, "volumes")
capabilities = maybe(json, "capabilities")
self.name = str(name)
self.image = Image.from_json(image, logger)
self.privileged = privileged is not None and bool(privileged)
self.read_only = read_only is not None and bool(read_only)
self.replace = replace is not None and bool(replace)
self.pull_policy = str(pull_policy)
self.restart = str(restart)
self.network = Network.from_json(network, logger)
self.dns = Dns.from_json(dns, logger)
self.ports = Ports.from_json(ports, logger)
self.env = Environment.from_json(env, logger)
self.env.file = "/var/lib/containerctl/containers/"
self.env.file += f"{self.name}/environment"
self.secrets = Secret.from_json(secrets, logger)
self.volumes = Volume.from_json(volumes, logger)
self.capabilities = Capability.from_json(capabilities, logger)
def create_volumes(self) -> str:
"""Generate podman volume create commands."""
if self.volumes is None:
return ""
volumes = ""
for volume in self.volumes:
volumes += volume.create()
return volumes
def create_secrets(self) -> str:
"""Generate podman secret create commands."""
if self.secrets is None:
return ""
secrets = ""
for secret in self.secrets:
secrets += secret.create()
return secrets
def create_environment(self) -> str:
"""Wrap self.env.create()."""
if self.env is None:
return ""
return self.env.create()
def create_container(self) -> str:
"""Generate podman container create command."""
cmd = f"# Create container {self.name}\n"
cmd += "podman container crate \\\n"
cmd += f"\t--name={self.name} \\\n"
if self.privileged:
cmd += "\t--privileged \\\n"
if self.replace:
cmd += "\t--replace \\\n"
if self.read_only:
cmd += "\t--read-only \\\n"
cmd += f"\t--restart={self.restart} \\\n"
cmd += f"\t--pull={self.pull_policy} \\\n"
cmd += f"\t{self.network.command()} \\\n"
cmd += f"\t{self.dns.command()} \\\n"
cmd += f"{self.ports.command()}"
if self.env is not None:
cmd += f"\t{self.env.command()} \\\n"
for secret in self.secrets:
cmd += f"\t{secret.command()} \\\n"
for volume in self.volumes:
cmd += f"\t{volume.command()} \\\n"
for capability in self.capabilities:
cmd += f"\t{capability.command()} \\\n"
cmd += f"\t{self.image.command()}\n"
return cmd
def start_container(self) -> str:
"""Generate podman container start command."""
cmd = f"# Start container {self.name}\n"
cmd += f"podman container start {self.name}\n"
return cmd
def stop_container(self) -> str:
"""Generate podman container stop command."""
cmd = f"# Stop container {self.name}\n"
cmd += f"podman container stop {self.name}\n"
return cmd
def restart_container(self) -> str:
"""Generate podman container restart composite command."""
cmd = f"# Restart container {self.name}\n"
cmd += self.stop_container()
cmd += "sleep 5s\n"
cmd += self.start_container()
return cmd
def upgrade_container(self) -> str:
"""Generate podman container upgrade composite command."""
comment = f"# Upgrade container {self.name}\n"
remove_container = self.remove_container()
remove_env_file = self.remove_environment()
create_container = self.create_container()
create_env_file = self.create_environment()
start_container = self.start_container()
remove = f"{remove_container}\n{remove_env_file}\n"
create = f"{create_env_file}\n{create_container}\n"
create_and_start = create + "\n" + start_container
return f"{comment}\n{remove}\n{create_and_start}"
def purge_container(self) -> str:
"""Generate podman container purge composite command."""
remove_container = self.remove_container()
remove_volumes = self.remove_volumes()
remove_secrets = self.remove_secrets()
remove_env_file = self.remove_environment()
remove_data = f"{remove_volumes}\n{remove_secrets}\n{remove_env_file}"
return f"{remove_container}\n{remove_data}"
def remove_container(self) -> str:
"""Generate podman container remove command."""
cmd = self.stop_container()
cmd += f"\n# Remove container {self.name}\n"
cmd += f"podman container rm {self.name}\n"
return cmd
def remove_volumes(self) -> str:
"""Generate podman volume remove commands."""
if self.volumes is None:
return ""
volumes = ""
for volume in self.volumes:
volumes += volume.remove()
return volumes
def remove_secrets(self) -> str:
"""Generate podman secret remove commands."""
if self.secrets is None:
return ""
secrets = ""
for secret in self.secrets:
secrets += secret.remove()
return secrets
def remove_environment(self) -> str:
"""Wrap self.env.remove()."""
if self.env is None:
return ""
return self.env.remove()