patchcenter/app/services/snapshot_mgmt_service.py
Admin MPCZ d8d803fb48 feat(snapshots): support format SLPM (.exe Sanef Patch Manager) + colonne Origine
Probleme: tes 51 snapshots etaient au format SLPM_<auteur>_YYYYMMDD_HHMM (cree par le .exe)
non reconnu par PatchCenter qui n'attendait que le format <auteur>_YYYY-MM-DD_avant_patch.

- Service: nouveau regex SNAP_SLPM_RE + helper _detect_snap_origin retourne (origin, author)
- Champs ajoutes au snapshot: origin ('patchcenter'|'slpm'|None), is_managed_format
- Template:
  * Filtre 'Format gere uniquement' (renomme depuis 'PatchCenter uniquement')
  * Colonne 'Origine' avec badge: PatchCenter (bleu) / SLPM .exe (gris) / manuel (orange)
  * Colonne ajoutee dans header + cellules + colspan ajuste a 9
2026-05-07 20:47:32 +02:00

229 lines
8.9 KiB
Python

"""Service de gestion des snapshots VM existants — listing + suppression.
- list_snapshots(db) : itère les vCenters actifs et retourne tous les snapshots
avec leurs métadonnées (VM, nom snap, créé le, taille, description, créateur).
- delete_snapshot(db, vcenter_id, vm_moid, snapshot_id) : supprime un snapshot
spécifique par son moRef.
Identification de l'auteur : on parse le préfixe du nom du snapshot
(format PatchCenter : `<intervenant>_YYYY-MM-DD_avant_patch`).
"""
import re
import ssl
import logging
from datetime import datetime, timezone
from typing import Dict, Any, List
log = logging.getLogger("patchcenter.snapshot_mgmt")
try:
from pyVim.connect import SmartConnect, Disconnect
from pyVmomi import vim
PYVMOMI_OK = True
except ImportError:
PYVMOMI_OK = False
log.warning("pyvmomi non disponible — listing snapshots impossible")
# Formats gérés par les outils SecOps SANEF :
# PatchCenter (web) : `<auteur>_YYYY-MM-DD_avant_patch`
# Sanef Patch Manager : `SLPM_<auteur>_YYYYMMDD_HHMM`
SNAP_PATCHCENTER_RE = re.compile(
r"^(?P<author>[A-Za-z0-9_\-\.]+)_(?P<date>\d{4}-\d{2}-\d{2})_avant_patch$"
)
SNAP_SLPM_RE = re.compile(
r"^SLPM_(?P<author>[A-Za-z0-9_\-\.]+)_\d{8}_\d{4}$"
)
def _detect_snap_origin(name: str):
"""Renvoie (origin, author) ou (None, None) si format inconnu.
origin in {'patchcenter', 'slpm'} ; author = préfixe utilisateur."""
m = SNAP_PATCHCENTER_RE.match(name or "")
if m:
return "patchcenter", m.group("author")
m = SNAP_SLPM_RE.match(name or "")
if m:
return "slpm", m.group("author")
return None, None
def _get_vcenter_creds(db):
from .secrets_service import get_secret
user = (get_secret(db, "vcenter_user") or get_secret(db, "vsphere_user") or "").strip()
pwd = (get_secret(db, "vcenter_pass") or get_secret(db, "vsphere_pass") or "").strip()
return user, pwd
def _connect(endpoint, user, password):
try:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return SmartConnect(host=endpoint, user=user, pwd=password, sslContext=ctx)
except Exception as e:
log.warning(f"Connexion vCenter {endpoint} échouée: {e}")
return None
def _walk_snapshots(snapshot_list, vm, vcenter_name, vcenter_id, vm_moid, parent_path=""):
"""Walk récursif de l'arbre des snapshots d'une VM. Yield des dicts."""
for s in snapshot_list:
snap = s.snapshot # ManagedObject (vim.vm.Snapshot)
snap_moid = snap._moId # string ex 'snapshot-1234'
name = s.name or ""
path = f"{parent_path}/{name}" if parent_path else name
# Création (createTime peut être absent dans certaines versions)
created = getattr(s, "createTime", None)
if created is None:
created_iso = None
age_days = None
else:
try:
if created.tzinfo is None:
created = created.replace(tzinfo=timezone.utc)
created_iso = created.isoformat()
age_days = (datetime.now(timezone.utc) - created).total_seconds() / 86400.0
except Exception:
created_iso = str(created)
age_days = None
# Détection du format géré (PatchCenter ou .exe SLPM)
origin, author = _detect_snap_origin(name)
is_managed = origin is not None
yield {
"vcenter_id": vcenter_id,
"vcenter_name": vcenter_name,
"vm_name": vm.name,
"vm_moid": vm_moid,
"snap_id": snap_moid,
"snap_name": name,
"snap_path": path,
"description": s.description or "",
"created_at": created_iso,
"age_days": round(age_days, 2) if age_days is not None else None,
"author": author,
"origin": origin, # 'patchcenter' | 'slpm' | None
"is_managed_format": is_managed, # any des 2 formats SecOps
"is_patchcenter_format": origin == "patchcenter",
"is_current": bool(getattr(s, "id", None) and vm.snapshot and vm.snapshot.currentSnapshot
and vm.snapshot.currentSnapshot._moId == snap_moid),
}
# Récurse enfants
if s.childSnapshotList:
yield from _walk_snapshots(s.childSnapshotList, vm, vcenter_name, vcenter_id,
vm_moid, parent_path=path)
def list_snapshots(db, vcenter_filter_id: int = None) -> Dict[str, Any]:
"""Itère les vCenters actifs et retourne tous les snapshots existants.
Si `vcenter_filter_id` est fourni, ne scanne que ce vCenter.
Retourne {ok, snapshots: list, errors: list}."""
if not PYVMOMI_OK:
return {"ok": False, "msg": "pyvmomi non disponible côté serveur PatchCenter",
"snapshots": [], "errors": []}
from sqlalchemy import text as sqlt
user, pwd = _get_vcenter_creds(db)
if not user or not pwd:
return {"ok": False, "msg": "Credentials vCenter manquants (Settings > vSphere)",
"snapshots": [], "errors": []}
where = "WHERE is_active = true"
params = {}
if vcenter_filter_id:
where += " AND id = :id"
params["id"] = vcenter_filter_id
vcs = db.execute(sqlt(f"SELECT id, name, endpoint FROM vcenters {where} ORDER BY name"),
params).fetchall()
if not vcs:
return {"ok": False, "msg": "Aucun vCenter actif", "snapshots": [], "errors": []}
all_snaps = []
errors = []
for vc in vcs:
si = _connect(vc.endpoint, user, pwd)
if not si:
errors.append({"vcenter": vc.name, "msg": "Connexion KO"})
continue
try:
content = si.RetrieveContent()
container = content.viewManager.CreateContainerView(
content.rootFolder, [vim.VirtualMachine], True)
try:
for vm in container.view:
try:
if not vm.snapshot or not vm.snapshot.rootSnapshotList:
continue
for snap in _walk_snapshots(vm.snapshot.rootSnapshotList,
vm, vc.name, vc.id, vm._moId):
all_snaps.append(snap)
except Exception as e:
# VM inaccessible / template / config invalide → ignore
log.debug(f"VM scan err on {vc.name}: {e}")
finally:
container.Destroy()
except Exception as e:
errors.append({"vcenter": vc.name, "msg": str(e)[:200]})
finally:
try:
Disconnect(si)
except Exception:
pass
# Tri : plus ancien en haut
all_snaps.sort(key=lambda s: (s.get("age_days") or 0), reverse=True)
return {"ok": True, "snapshots": all_snaps, "errors": errors,
"vcenter_count": len(vcs), "snap_count": len(all_snaps)}
def delete_snapshot(db, vcenter_id: int, vm_moid: str, snap_id: str,
remove_children: bool = False) -> Dict[str, Any]:
"""Supprime un snapshot identifié par (vcenter_id, vm_moid, snap_id).
remove_children=True supprime aussi les snapshots enfants."""
if not PYVMOMI_OK:
return {"ok": False, "msg": "pyvmomi non disponible"}
from sqlalchemy import text as sqlt
vc = db.execute(sqlt("SELECT id, name, endpoint FROM vcenters WHERE id=:id"),
{"id": vcenter_id}).fetchone()
if not vc:
return {"ok": False, "msg": f"vCenter id={vcenter_id} introuvable"}
user, pwd = _get_vcenter_creds(db)
if not user or not pwd:
return {"ok": False, "msg": "Credentials vCenter manquants (Settings > vSphere)"}
si = _connect(vc.endpoint, user, pwd)
if not si:
return {"ok": False, "msg": f"Connexion vCenter {vc.name} KO"}
try:
# On reconstruit le ManagedObject directement par ses moRef
snap_mo = vim.vm.Snapshot(snap_id, si._stub)
task = snap_mo.RemoveSnapshot_Task(removeChildren=remove_children)
# Wait task
from pyVmomi import vim as _vim
while task.info.state in (_vim.TaskInfo.State.queued, _vim.TaskInfo.State.running):
import time as _t
_t.sleep(1)
if task.info.state == _vim.TaskInfo.State.success:
return {"ok": True, "msg": f"Snapshot supprimé sur {vc.name}",
"vcenter": vc.name, "vm_moid": vm_moid, "snap_id": snap_id}
else:
err = task.info.error
err_msg = err.msg if err else "task failed"
return {"ok": False, "msg": f"Échec suppression: {err_msg}",
"vcenter": vc.name}
except Exception as e:
return {"ok": False, "msg": f"Exception: {type(e).__name__}: {e}",
"vcenter": vc.name}
finally:
try:
Disconnect(si)
except Exception:
pass