238 lines
9.4 KiB
Python
238 lines
9.4 KiB
Python
"""Service de gestion des snapshots VM existants — listing + suppression.
|
|
|
|
- list_snapshots(db) : itère les vCenters actifs et retourne tous les snapshots
|
|
avec leurs métadonnées (VM, nom snap, créé le, taille, description, créateur).
|
|
- delete_snapshot(db, vcenter_id, vm_moid, snapshot_id) : supprime un snapshot
|
|
spécifique par son moRef.
|
|
|
|
Identification de l'auteur : on parse le préfixe du nom du snapshot
|
|
(format PatchCenter : `<intervenant>_YYYY-MM-DD_avant_patch`).
|
|
"""
|
|
import re
|
|
import ssl
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, Any, List
|
|
|
|
log = logging.getLogger("patchcenter.snapshot_mgmt")
|
|
|
|
try:
|
|
from pyVim.connect import SmartConnect, Disconnect
|
|
from pyVmomi import vim
|
|
PYVMOMI_OK = True
|
|
except ImportError:
|
|
PYVMOMI_OK = False
|
|
log.warning("pyvmomi non disponible — listing snapshots impossible")
|
|
|
|
|
|
# Formats gérés par les outils SecOps SANEF :
|
|
# PatchCenter v2 : `<user>_YYYY-MM-DD_HH-MM_avant_patch` (user = login JWT, depuis 2026-05-07)
|
|
# PatchCenter v1 : `<auteur>_YYYY-MM-DD_avant_patch` (legacy, basé sur intervenant)
|
|
# .exe SLPM : `SLPM_<auteur>_YYYYMMDD_HHMM`
|
|
SNAP_PATCHCENTER_V2_RE = re.compile(
|
|
r"^(?P<author>[A-Za-z0-9_\-\.]+)_(?P<date>\d{4}-\d{2}-\d{2})_\d{2}-\d{2}_avant_patch$"
|
|
)
|
|
SNAP_PATCHCENTER_V1_RE = re.compile(
|
|
r"^(?P<author>[A-Za-z0-9_\-\.]+)_(?P<date>\d{4}-\d{2}-\d{2})_avant_patch$"
|
|
)
|
|
SNAP_SLPM_RE = re.compile(
|
|
r"^SLPM_(?P<author>[A-Za-z0-9_\-\.]+)_\d{8}_\d{4}$"
|
|
)
|
|
|
|
|
|
def _detect_snap_origin(name: str):
|
|
"""Renvoie (origin, author) ou (None, None) si format inconnu.
|
|
origin in {'patchcenter', 'slpm'} ; author = préfixe utilisateur.
|
|
PatchCenter v2 (avec heure) testé en premier pour ne pas matcher v1."""
|
|
n = name or ""
|
|
m = SNAP_PATCHCENTER_V2_RE.match(n)
|
|
if m:
|
|
return "patchcenter", m.group("author")
|
|
m = SNAP_PATCHCENTER_V1_RE.match(n)
|
|
if m:
|
|
return "patchcenter", m.group("author")
|
|
m = SNAP_SLPM_RE.match(n)
|
|
if m:
|
|
return "slpm", m.group("author")
|
|
return None, None
|
|
|
|
|
|
def _get_vcenter_creds(db):
|
|
from .secrets_service import get_secret
|
|
user = (get_secret(db, "vcenter_user") or get_secret(db, "vsphere_user") or "").strip()
|
|
pwd = (get_secret(db, "vcenter_pass") or get_secret(db, "vsphere_pass") or "").strip()
|
|
return user, pwd
|
|
|
|
|
|
def _connect(endpoint, user, password):
|
|
try:
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
return SmartConnect(host=endpoint, user=user, pwd=password, sslContext=ctx)
|
|
except Exception as e:
|
|
log.warning(f"Connexion vCenter {endpoint} échouée: {e}")
|
|
return None
|
|
|
|
|
|
def _walk_snapshots(snapshot_list, vm, vcenter_name, vcenter_id, vm_moid, parent_path=""):
|
|
"""Walk récursif de l'arbre des snapshots d'une VM. Yield des dicts."""
|
|
for s in snapshot_list:
|
|
snap = s.snapshot # ManagedObject (vim.vm.Snapshot)
|
|
snap_moid = snap._moId # string ex 'snapshot-1234'
|
|
name = s.name or ""
|
|
path = f"{parent_path}/{name}" if parent_path else name
|
|
|
|
# Création (createTime peut être absent dans certaines versions)
|
|
created = getattr(s, "createTime", None)
|
|
if created is None:
|
|
created_iso = None
|
|
age_days = None
|
|
else:
|
|
try:
|
|
if created.tzinfo is None:
|
|
created = created.replace(tzinfo=timezone.utc)
|
|
created_iso = created.isoformat()
|
|
age_days = (datetime.now(timezone.utc) - created).total_seconds() / 86400.0
|
|
except Exception:
|
|
created_iso = str(created)
|
|
age_days = None
|
|
|
|
# Détection du format géré (PatchCenter ou .exe SLPM)
|
|
origin, author = _detect_snap_origin(name)
|
|
is_managed = origin is not None
|
|
|
|
yield {
|
|
"vcenter_id": vcenter_id,
|
|
"vcenter_name": vcenter_name,
|
|
"vm_name": vm.name,
|
|
"vm_moid": vm_moid,
|
|
"snap_id": snap_moid,
|
|
"snap_name": name,
|
|
"snap_path": path,
|
|
"description": s.description or "",
|
|
"created_at": created_iso,
|
|
"age_days": round(age_days, 2) if age_days is not None else None,
|
|
"author": author,
|
|
"origin": origin, # 'patchcenter' | 'slpm' | None
|
|
"is_managed_format": is_managed, # any des 2 formats SecOps
|
|
"is_patchcenter_format": origin == "patchcenter",
|
|
"is_current": bool(getattr(s, "id", None) and vm.snapshot and vm.snapshot.currentSnapshot
|
|
and vm.snapshot.currentSnapshot._moId == snap_moid),
|
|
}
|
|
|
|
# Récurse enfants
|
|
if s.childSnapshotList:
|
|
yield from _walk_snapshots(s.childSnapshotList, vm, vcenter_name, vcenter_id,
|
|
vm_moid, parent_path=path)
|
|
|
|
|
|
def list_snapshots(db, vcenter_filter_id: int = None) -> Dict[str, Any]:
|
|
"""Itère les vCenters actifs et retourne tous les snapshots existants.
|
|
|
|
Si `vcenter_filter_id` est fourni, ne scanne que ce vCenter.
|
|
Retourne {ok, snapshots: list, errors: list}."""
|
|
if not PYVMOMI_OK:
|
|
return {"ok": False, "msg": "pyvmomi non disponible côté serveur PatchCenter",
|
|
"snapshots": [], "errors": []}
|
|
|
|
from sqlalchemy import text as sqlt
|
|
user, pwd = _get_vcenter_creds(db)
|
|
if not user or not pwd:
|
|
return {"ok": False, "msg": "Credentials vCenter manquants (Settings > vSphere)",
|
|
"snapshots": [], "errors": []}
|
|
|
|
where = "WHERE is_active = true"
|
|
params = {}
|
|
if vcenter_filter_id:
|
|
where += " AND id = :id"
|
|
params["id"] = vcenter_filter_id
|
|
vcs = db.execute(sqlt(f"SELECT id, name, endpoint FROM vcenters {where} ORDER BY name"),
|
|
params).fetchall()
|
|
if not vcs:
|
|
return {"ok": False, "msg": "Aucun vCenter actif", "snapshots": [], "errors": []}
|
|
|
|
all_snaps = []
|
|
errors = []
|
|
|
|
for vc in vcs:
|
|
si = _connect(vc.endpoint, user, pwd)
|
|
if not si:
|
|
errors.append({"vcenter": vc.name, "msg": "Connexion KO"})
|
|
continue
|
|
try:
|
|
content = si.RetrieveContent()
|
|
container = content.viewManager.CreateContainerView(
|
|
content.rootFolder, [vim.VirtualMachine], True)
|
|
try:
|
|
for vm in container.view:
|
|
try:
|
|
if not vm.snapshot or not vm.snapshot.rootSnapshotList:
|
|
continue
|
|
for snap in _walk_snapshots(vm.snapshot.rootSnapshotList,
|
|
vm, vc.name, vc.id, vm._moId):
|
|
all_snaps.append(snap)
|
|
except Exception as e:
|
|
# VM inaccessible / template / config invalide → ignore
|
|
log.debug(f"VM scan err on {vc.name}: {e}")
|
|
finally:
|
|
container.Destroy()
|
|
except Exception as e:
|
|
errors.append({"vcenter": vc.name, "msg": str(e)[:200]})
|
|
finally:
|
|
try:
|
|
Disconnect(si)
|
|
except Exception:
|
|
pass
|
|
|
|
# Tri : plus ancien en haut
|
|
all_snaps.sort(key=lambda s: (s.get("age_days") or 0), reverse=True)
|
|
return {"ok": True, "snapshots": all_snaps, "errors": errors,
|
|
"vcenter_count": len(vcs), "snap_count": len(all_snaps)}
|
|
|
|
|
|
def delete_snapshot(db, vcenter_id: int, vm_moid: str, snap_id: str,
|
|
remove_children: bool = False) -> Dict[str, Any]:
|
|
"""Supprime un snapshot identifié par (vcenter_id, vm_moid, snap_id).
|
|
remove_children=True supprime aussi les snapshots enfants."""
|
|
if not PYVMOMI_OK:
|
|
return {"ok": False, "msg": "pyvmomi non disponible"}
|
|
|
|
from sqlalchemy import text as sqlt
|
|
vc = db.execute(sqlt("SELECT id, name, endpoint FROM vcenters WHERE id=:id"),
|
|
{"id": vcenter_id}).fetchone()
|
|
if not vc:
|
|
return {"ok": False, "msg": f"vCenter id={vcenter_id} introuvable"}
|
|
user, pwd = _get_vcenter_creds(db)
|
|
if not user or not pwd:
|
|
return {"ok": False, "msg": "Credentials vCenter manquants (Settings > vSphere)"}
|
|
|
|
si = _connect(vc.endpoint, user, pwd)
|
|
if not si:
|
|
return {"ok": False, "msg": f"Connexion vCenter {vc.name} KO"}
|
|
try:
|
|
# On reconstruit le ManagedObject directement par ses moRef
|
|
snap_mo = vim.vm.Snapshot(snap_id, si._stub)
|
|
task = snap_mo.RemoveSnapshot_Task(removeChildren=remove_children)
|
|
# Wait task
|
|
from pyVmomi import vim as _vim
|
|
while task.info.state in (_vim.TaskInfo.State.queued, _vim.TaskInfo.State.running):
|
|
import time as _t
|
|
_t.sleep(1)
|
|
if task.info.state == _vim.TaskInfo.State.success:
|
|
return {"ok": True, "msg": f"Snapshot supprimé sur {vc.name}",
|
|
"vcenter": vc.name, "vm_moid": vm_moid, "snap_id": snap_id}
|
|
else:
|
|
err = task.info.error
|
|
err_msg = err.msg if err else "task failed"
|
|
return {"ok": False, "msg": f"Échec suppression: {err_msg}",
|
|
"vcenter": vc.name}
|
|
except Exception as e:
|
|
return {"ok": False, "msg": f"Exception: {type(e).__name__}: {e}",
|
|
"vcenter": vc.name}
|
|
finally:
|
|
try:
|
|
Disconnect(si)
|
|
except Exception:
|
|
pass
|