Se adoptó la versión actualmente recomandada por el equipo de docker el formado "compose.yaml" También se crearon nuevos scripts y actualizaron algunos para adaptarse a los nuevos archivos.
467 lines
16 KiB
Python
467 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
restore_compose_volumes.py
|
|
--------------------------
|
|
Restaura volúmenes desde backups generados por backup_compose_volumes.py.
|
|
|
|
- Busca carpetas ./docker-volumes-<timestamp>
|
|
- Lee .tar.gz (nombres: <volume_name>-<YYYYMMDD-HHMMSS>.tar.gz)
|
|
- Dos modos:
|
|
1) Tradicional (sin labels)
|
|
2) Reconocido por Compose (aplica labels com.docker.compose.* para evitar el warning)
|
|
|
|
Además:
|
|
- Si un volumen existe y está en uso, ofrece detener y eliminar contenedores que lo usan
|
|
para poder recrearlo con labels correctos (solo en modo 2).
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import json
|
|
import shlex
|
|
import pathlib
|
|
import subprocess
|
|
from typing import List, Tuple, Dict, Optional
|
|
|
|
PROJECT_ROOT = pathlib.Path.cwd()
|
|
BACKUP_DIR_PATTERN = re.compile(r"^docker-volumes-\d{8}-\d{6}$")
|
|
ARCHIVE_PATTERN = re.compile(r"^(?P<basename>.+)-(?P<ts>\d{8}-\d{6})\.tar\.gz$")
|
|
|
|
# ---------- utils ----------
|
|
|
|
def run(cmd: List[str], check: bool = False, capture_output: bool = True, text: bool = True) -> subprocess.CompletedProcess:
|
|
return subprocess.run(cmd, check=check, capture_output=capture_output, text=text)
|
|
|
|
def which(prog: str) -> bool:
|
|
from shutil import which as _w
|
|
return _w(prog) is not None
|
|
|
|
def fail(msg: str):
|
|
print(f"✗ {msg}")
|
|
sys.exit(1)
|
|
|
|
def ok(msg: str):
|
|
print(f"✓ {msg}")
|
|
|
|
def info(msg: str):
|
|
print(f"• {msg}")
|
|
|
|
def warn(msg: str):
|
|
print(f"! {msg}")
|
|
|
|
def yes_no(prompt: str, default: str = "n") -> bool:
|
|
default = default.lower()
|
|
hint = "[Y/n]" if default == "y" else "[y/N]"
|
|
while True:
|
|
resp = input(f"{prompt} {hint} ").strip().lower()
|
|
if not resp:
|
|
return default == "y"
|
|
if resp in ("y","yes","s","si","sí"):
|
|
return True
|
|
if resp in ("n","no"):
|
|
return False
|
|
print("Respuesta no reconocida. Responde 'y' o 'n'.")
|
|
|
|
# ---------- docker helpers ----------
|
|
|
|
def ensure_alpine_image():
|
|
try:
|
|
run(["docker", "image", "inspect", "alpine:latest"], check=True)
|
|
except subprocess.CalledProcessError:
|
|
info("Descargando alpine:latest ...")
|
|
run(["docker", "pull", "alpine:latest"], check=True, capture_output=False, text=True)
|
|
|
|
def volume_exists(name: str) -> bool:
|
|
try:
|
|
run(["docker", "volume", "inspect", name], check=True)
|
|
return True
|
|
except subprocess.CalledProcessError:
|
|
return False
|
|
|
|
def inspect_volume_labels(name: str) -> Dict[str, str]:
|
|
try:
|
|
cp = run(["docker", "volume", "inspect", name, "--format", "{{json .Labels}}"], check=True)
|
|
return json.loads(cp.stdout or "null") or {}
|
|
except subprocess.CalledProcessError:
|
|
return {}
|
|
|
|
def containers_using_volume(name: str) -> List[str]:
|
|
# docker ps soporta --filter volume=<name>
|
|
try:
|
|
cp = run(["docker", "ps", "-a", "--filter", f"volume={name}", "-q"], check=True)
|
|
return [l.strip() for l in cp.stdout.splitlines() if l.strip()]
|
|
except subprocess.CalledProcessError:
|
|
return []
|
|
|
|
def stop_containers(ids: List[str]) -> None:
|
|
if not ids:
|
|
return
|
|
info("Deteniendo contenedores que usan el volumen...")
|
|
run(["docker", "stop"] + ids, check=False, capture_output=False)
|
|
|
|
def remove_containers(ids: List[str]) -> None:
|
|
if not ids:
|
|
return
|
|
info("Eliminando contenedores detenidos que usan el volumen...")
|
|
run(["docker", "rm"] + ids, check=False, capture_output=False)
|
|
|
|
def remove_volume(name: str) -> bool:
|
|
try:
|
|
run(["docker", "volume", "rm", "-f", name], check=True, capture_output=False)
|
|
return True
|
|
except subprocess.CalledProcessError as e:
|
|
warn(f"No se pudo eliminar volumen '{name}': {e}")
|
|
return False
|
|
|
|
def create_volume(name: str, labels: Optional[Dict[str,str]] = None) -> bool:
|
|
cmd = ["docker", "volume", "create"]
|
|
if labels:
|
|
for k, v in labels.items():
|
|
cmd += ["--label", f"{k}={v}"]
|
|
cmd.append(name)
|
|
try:
|
|
run(cmd, check=True, capture_output=False)
|
|
return True
|
|
except subprocess.CalledProcessError as e:
|
|
warn(f"Fallo creando volumen '{name}': {e}")
|
|
return False
|
|
|
|
def restore_into_volume(volume_name: str, backup_dir: pathlib.Path, archive_file: pathlib.Path) -> int:
|
|
bdir_abs = backup_dir.resolve()
|
|
docker_cmd = [
|
|
"docker", "run", "--rm",
|
|
"-v", f"{volume_name}:/volume",
|
|
"-v", f"{str(bdir_abs)}:/backup",
|
|
"alpine:latest",
|
|
"sh", "-lc",
|
|
f"tar xzf /backup/{shlex.quote(archive_file.name)} -C /volume"
|
|
]
|
|
proc = subprocess.run(docker_cmd)
|
|
return proc.returncode
|
|
|
|
# ---------- parsing helpers ----------
|
|
|
|
def find_backup_dirs(root: pathlib.Path) -> List[pathlib.Path]:
|
|
dirs = [p for p in root.iterdir() if p.is_dir() and BACKUP_DIR_PATTERN.match(p.name)]
|
|
dirs.sort(key=lambda p: p.stat().st_mtime, reverse=True)
|
|
return dirs
|
|
|
|
def find_archives(dirpath: pathlib.Path) -> List[pathlib.Path]:
|
|
files = [p for p in dirpath.iterdir() if p.is_file() and p.name.endswith(".tar.gz")]
|
|
files.sort(key=lambda p: p.name)
|
|
return files
|
|
|
|
def parse_archive_basename(archive_name: str) -> Optional[str]:
|
|
m = ARCHIVE_PATTERN.match(archive_name)
|
|
if not m:
|
|
return None
|
|
return m.group("basename")
|
|
|
|
# ---------- compose label helpers ----------
|
|
|
|
def derive_labels_auto(volume_name: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""
|
|
project = parte antes del primer '_' o '-'
|
|
short = resto luego del separador
|
|
"""
|
|
for sep in ("_", "-"):
|
|
if sep in volume_name:
|
|
idx = volume_name.find(sep)
|
|
return volume_name[:idx], volume_name[idx+1:]
|
|
return None, None
|
|
|
|
def derive_labels_with_fixed_project(volume_name: str, project: str) -> Tuple[str, str]:
|
|
"""
|
|
Usa project fijo. Si volume_name empieza con '<project>_' o '<project>-', recorta.
|
|
"""
|
|
p = project
|
|
if volume_name.startswith(p + "_"):
|
|
return p, volume_name[len(p) + 1:]
|
|
if volume_name.startswith(p + "-"):
|
|
return p, volume_name[len(p) + 1:]
|
|
return p, volume_name
|
|
|
|
def labels_match_compose(name: str, project: str, short: str) -> bool:
|
|
labels = inspect_volume_labels(name)
|
|
return (
|
|
labels.get("com.docker.compose.project") == project and
|
|
labels.get("com.docker.compose.volume") == short
|
|
)
|
|
|
|
# ---------- UI flows ----------
|
|
|
|
def pick_backup_dir(dirs: List[pathlib.Path]) -> Optional[pathlib.Path]:
|
|
if not dirs:
|
|
warn("No se encontraron carpetas de backup 'docker-volumes-<timestamp>'.")
|
|
return None
|
|
print("\nCarpetas de backup encontradas:")
|
|
for i, d in enumerate(dirs, 1):
|
|
print(f" {i}) {d.name}")
|
|
while True:
|
|
sel = input("> Elige una carpeta (número) o Enter para cancelar: ").strip()
|
|
if not sel:
|
|
return None
|
|
if sel.isdigit() and 1 <= int(sel) <= len(dirs):
|
|
return dirs[int(sel) - 1]
|
|
print("Opción inválida.")
|
|
|
|
def pick_archives(files: List[pathlib.Path]) -> List[pathlib.Path]:
|
|
if not files:
|
|
warn("No hay archivos .tar.gz en esa carpeta.")
|
|
return []
|
|
print("\nBackups disponibles:")
|
|
for i, f in enumerate(files, 1):
|
|
base = parse_archive_basename(f.name) or f.name
|
|
print(f" {i}) {f.name} -> volumen: {base}")
|
|
print("\nOpciones:")
|
|
print(" a) Restaurar TODOS")
|
|
print(" s) Seleccionar algunos (ej: 1,3,5)")
|
|
while True:
|
|
sel = input("> Elige 'a' o 's': ").strip().lower()
|
|
if sel == "a":
|
|
return files
|
|
if sel == "s":
|
|
picks = input("> Números separados por coma: ").strip()
|
|
idxs = []
|
|
try:
|
|
for tok in picks.split(","):
|
|
tok = tok.strip()
|
|
if tok:
|
|
idx = int(tok)
|
|
idxs.append(idx - 1)
|
|
chosen = [files[i] for i in sorted(set(i for i in idxs if 0 <= i < len(files)))]
|
|
if chosen:
|
|
return chosen
|
|
except Exception:
|
|
pass
|
|
print("Selección inválida.")
|
|
else:
|
|
print("Opción inválida.")
|
|
|
|
def pick_restore_mode() -> str:
|
|
print("\nModo de restauración:")
|
|
print(" 1) Tradicional (sin labels)")
|
|
print(" 2) Reconocido por Compose (aplica labels para evitar el warning)")
|
|
while True:
|
|
sel = input("> Elige 1 o 2: ").strip()
|
|
if sel in ("1", "2"):
|
|
return sel
|
|
print("Opción inválida.")
|
|
|
|
def confirm_overwrite(volume_name: str) -> bool:
|
|
return yes_no(f"El volumen '{volume_name}' ya existe. ¿Sobrescribir (recrear)?", default="n")
|
|
|
|
# ---------- restore flows ----------
|
|
|
|
def restore_traditional(backup_dir: pathlib.Path, archives: List[pathlib.Path]):
|
|
ensure_alpine_image()
|
|
print("\n=== Restauración TRADICIONAL ===\n")
|
|
for arch in archives:
|
|
vname = parse_archive_basename(arch.name)
|
|
if not vname:
|
|
warn(f"Nombre de backup no reconocible: {arch.name}, se omite.")
|
|
continue
|
|
info(f"Volumen: {vname}")
|
|
|
|
# Tradicional: no cambiamos labels; si existe, restauramos sobre volumen nuevo (recreándolo)
|
|
if volume_exists(vname):
|
|
# Intentar eliminar: si está en uso, ofrecer detener/remover contenedores
|
|
if not confirm_overwrite(vname):
|
|
info(" → Omitido (ya existe).")
|
|
continue
|
|
ids = containers_using_volume(vname)
|
|
if ids:
|
|
info(f"Contenedores que usan '{vname}': {', '.join(ids)}")
|
|
if yes_no("¿Detener y eliminar esos contenedores para continuar?", default="y"):
|
|
stop_containers(ids)
|
|
remove_containers(ids)
|
|
else:
|
|
warn(" → No se puede recrear el volumen en uso. Omitido.")
|
|
continue
|
|
if not remove_volume(vname):
|
|
warn(" → No se pudo eliminar el volumen. Omitido.")
|
|
continue
|
|
|
|
if not create_volume(vname):
|
|
warn(" → No se pudo crear el volumen, se omite.")
|
|
continue
|
|
rc = restore_into_volume(vname, backup_dir, arch)
|
|
if rc == 0:
|
|
ok(" Restaurado.")
|
|
else:
|
|
warn(f" Falló la restauración (rc={rc}).")
|
|
|
|
def restore_with_compose_labels(backup_dir: pathlib.Path, archives: List[pathlib.Path]):
|
|
"""
|
|
Restaura creando volúmenes con labels de Compose para que NO aparezca el warning:
|
|
"volume ... already exists but was not created by Docker Compose..."
|
|
"""
|
|
ensure_alpine_image()
|
|
print("\n=== Restauración RECONOCIDA POR COMPOSE (con labels) ===\n")
|
|
print("Estrategia de etiquetado:")
|
|
print(" 1) Auto (project = prefijo de <vol> antes de '_' o '-', short = resto)")
|
|
print(" 2) Fijar un 'project' para todos (p. ej. suitecoffee, suitecoffee_dev, suitecoffee_prod)")
|
|
mode = ""
|
|
while mode not in ("1", "2"):
|
|
mode = input("> Elige 1 o 2: ").strip()
|
|
|
|
fixed_project = None
|
|
if mode == "2":
|
|
fixed_project = input("> Indica el 'project' de Compose (exacto): ").strip()
|
|
if not fixed_project:
|
|
warn("Project vacío, cancelado.")
|
|
return
|
|
|
|
# Previsualización de etiquetas
|
|
preview = []
|
|
for arch in archives:
|
|
vname = parse_archive_basename(arch.name)
|
|
if not vname:
|
|
continue
|
|
if mode == "1":
|
|
proj, short = derive_labels_auto(vname)
|
|
else:
|
|
proj, short = derive_labels_with_fixed_project(vname, fixed_project)
|
|
preview.append((arch, vname, proj, short))
|
|
|
|
print("\nVista previa de etiquetas (project / volume):")
|
|
for _, vname, proj, short in preview:
|
|
if proj and short:
|
|
print(f" {vname} → project='{proj}', volume='{short}'")
|
|
else:
|
|
print(f" {vname} → (no derivado; se pedirá manualmente)")
|
|
|
|
if not yes_no("\n¿Confirmar restauración con estas etiquetas?", default="y"):
|
|
warn("Cancelado por el usuario.")
|
|
return
|
|
|
|
# Restaurar con labels
|
|
for arch, vname, proj, short in preview:
|
|
# completar manual si falta
|
|
if not proj or not short:
|
|
print(f"\nDefinir etiquetas para: {vname}")
|
|
proj = input(" project = ").strip()
|
|
short = input(" volume = ").strip()
|
|
if not proj or not short:
|
|
warn(" → Etiquetas incompletas; se omite.")
|
|
continue
|
|
|
|
info(f"\nVolumen: {vname} (labels: project='{proj}', volume='{short}')")
|
|
|
|
if volume_exists(vname):
|
|
# ¿ya tiene labels correctas? entonces solo restauramos datos sin recrear
|
|
if labels_match_compose(vname, proj, short):
|
|
info(" Volumen ya tiene labels de Compose correctas. Sobrescribiendo datos...")
|
|
rc = restore_into_volume(vname, backup_dir, arch)
|
|
if rc == 0:
|
|
ok(" Restaurado (labels ya correctas).")
|
|
else:
|
|
warn(f" Falló la restauración (rc={rc}).")
|
|
continue
|
|
|
|
# Pedir permiso para detener/eliminar contenedores y recrear volumen con labels correctas
|
|
if not yes_no(" El volumen existe sin labels correctas. ¿Detener/eliminar contenedores y recrearlo con labels para evitar el warning?", default="y"):
|
|
warn(" → Omitido (mantiene warning de Compose).")
|
|
continue
|
|
|
|
ids = containers_using_volume(vname)
|
|
if ids:
|
|
info(f" Contenedores que usan '{vname}': {', '.join(ids)}")
|
|
stop_containers(ids)
|
|
remove_containers(ids)
|
|
|
|
if not remove_volume(vname):
|
|
warn(" → No se pudo eliminar el volumen. Omitido.")
|
|
continue
|
|
|
|
labels = {
|
|
"com.docker.compose.project": proj,
|
|
"com.docker.compose.volume": short,
|
|
}
|
|
if not create_volume(vname, labels=labels):
|
|
warn(" → No se pudo crear el volumen con labels. Omitido.")
|
|
continue
|
|
|
|
rc = restore_into_volume(vname, backup_dir, arch)
|
|
if rc == 0:
|
|
ok(" Restaurado con labels de Compose (warning resuelto).")
|
|
else:
|
|
warn(f" Falló la restauración (rc={rc}).")
|
|
|
|
# ---------- main ----------
|
|
|
|
def main():
|
|
if not which("docker"):
|
|
fail("No se encontró 'docker' en el PATH.")
|
|
try:
|
|
run(["docker", "version"], check=True)
|
|
except subprocess.CalledProcessError:
|
|
fail("No se puede comunicar con el daemon de Docker. ¿Está corriendo?")
|
|
|
|
# Elegir carpeta docker-volumes-<ts>
|
|
dirs = [p for p in PROJECT_ROOT.iterdir() if p.is_dir() and BACKUP_DIR_PATTERN.match(p.name)]
|
|
dirs.sort(key=lambda p: p.stat().st_mtime, reverse=True)
|
|
if not dirs:
|
|
warn("No hay carpetas de backup 'docker-volumes-<timestamp>'.")
|
|
return
|
|
|
|
print("\nCarpetas de backup encontradas:")
|
|
for i, d in enumerate(dirs, 1):
|
|
print(f" {i}) {d.name}")
|
|
chosen = None
|
|
while not chosen:
|
|
sel = input("> Elige una carpeta (número) o Enter para salir: ").strip()
|
|
if not sel:
|
|
warn("Cancelado."); return
|
|
if sel.isdigit() and 1 <= int(sel) <= len(dirs):
|
|
chosen = dirs[int(sel)-1]
|
|
else:
|
|
print("Opción inválida.")
|
|
|
|
# Archivos en carpeta
|
|
archives = [p for p in chosen.iterdir() if p.is_file() and p.name.endswith(".tar.gz")]
|
|
archives.sort(key=lambda p: p.name)
|
|
if not archives:
|
|
warn("No hay .tar.gz en esa carpeta."); return
|
|
|
|
print("\nBackups disponibles:")
|
|
for i, f in enumerate(archives, 1):
|
|
base = parse_archive_basename(f.name) or f.name
|
|
print(f" {i}) {f.name} -> volumen: {base}")
|
|
|
|
print("\nOpciones de selección:")
|
|
print(" a) Restaurar TODOS")
|
|
print(" s) Elegir algunos (ej: 1,3,5)")
|
|
selected: List[pathlib.Path] = []
|
|
while not selected:
|
|
mode = input("> Elige 'a' o 's': ").strip().lower()
|
|
if mode == "a":
|
|
selected = archives
|
|
elif mode == "s":
|
|
picks = input("> Números separados por coma: ").strip()
|
|
try:
|
|
idxs = [int(x.strip())-1 for x in picks.split(",") if x.strip()]
|
|
selected = [archives[i] for i in sorted(set(i for i in idxs if 0 <= i < len(archives)))]
|
|
except Exception:
|
|
selected = []
|
|
else:
|
|
print("Opción inválida.")
|
|
|
|
# Modo de restauración
|
|
choice = pick_restore_mode()
|
|
if choice == "1":
|
|
restore_traditional(chosen, selected)
|
|
else:
|
|
restore_with_compose_labels(chosen, selected)
|
|
|
|
ok("\nProceso finalizado.")
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
print("\n✓ Interrumpido por el usuario (Ctrl+C).")
|