patchcenter/app/services/snapshot_mgmt_service.py
Admin MPCZ 720b0789e6 feat(snapshots): reconnait les formats manuels SANEF + toggle UI 'Tous formats'
- backend: nouveau regex SNAP_MANUAL_RE qui capture le premier token avant
  espace/underscore comme auteur. Permet de classer les snaps style
  'kmoad-ext avant maj' ou 'kmoad-ext s1' en origin='manual' avec auteur
  extrait, au lieu de origin=None.
- frontend: checkbox 'Tous formats' (cochée par défaut) qui inclut les
  snaps manual/slpm/patchcenter. Decoche pour PatchCenter only (ancien
  comportement).
- frontend: filtre intervenant elargi - match aussi sur nom du snap
  (contains) en plus de l'auteur extrait, pour couvrir les snaps dont
  l'auteur est concatene avec d'autres mots.

Resout le cas ou un utilisateur ne voyait qu'1 seul snap PatchCenter
alors qu'il avait des dizaines de snaps crees manuellement (format
'<user> avant maj').
2026-05-18 16:05:25 +02:00

246 lines
9.7 KiB
Python

"""Service de gestion des snapshots VM existants — listing + suppression.
- list_snapshots(db) : itère les vCenters actifs et retourne tous les snapshots
avec leurs métadonnées (VM, nom snap, créé le, taille, description, créateur).
- delete_snapshot(db, vcenter_id, vm_moid, snapshot_id) : supprime un snapshot
spécifique par son moRef.
Identification de l'auteur : on parse le préfixe du nom du snapshot
(format PatchCenter : `<intervenant>_YYYY-MM-DD_avant_patch`).
"""
import re
import ssl
import logging
from datetime import datetime, timezone
from typing import Dict, Any, List
log = logging.getLogger("patchcenter.snapshot_mgmt")
try:
from pyVim.connect import SmartConnect, Disconnect
from pyVmomi import vim
PYVMOMI_OK = True
except ImportError:
PYVMOMI_OK = False
log.warning("pyvmomi non disponible — listing snapshots impossible")
# Formats gérés par les outils SecOps SANEF :
# PatchCenter v2 : `<user>_YYYY-MM-DD_HH-MM_avant_patch` (user = login JWT, depuis 2026-05-07)
# PatchCenter v1 : `<auteur>_YYYY-MM-DD_avant_patch` (legacy, basé sur intervenant)
# .exe SLPM : `SLPM_<auteur>_YYYYMMDD_HHMM`
# manuel SANEF : `<auteur>[ _]...` — fallback, premier token = auteur
SNAP_PATCHCENTER_V2_RE = re.compile(
r"^(?P<author>[A-Za-z0-9_\-\.]+)_(?P<date>\d{4}-\d{2}-\d{2})_\d{2}-\d{2}_avant_patch$"
)
SNAP_PATCHCENTER_V1_RE = re.compile(
r"^(?P<author>[A-Za-z0-9_\-\.]+)_(?P<date>\d{4}-\d{2}-\d{2})_avant_patch$"
)
SNAP_SLPM_RE = re.compile(
r"^SLPM_(?P<author>[A-Za-z0-9_\-\.]+)_\d{8}_\d{4}$"
)
SNAP_MANUAL_RE = re.compile(
r"^(?P<author>[A-Za-z0-9][A-Za-z0-9_\-\.]{1,})[\s_]"
)
def _detect_snap_origin(name: str):
"""Renvoie (origin, author) ou (None, None) si format inconnu.
origin in {'patchcenter', 'slpm', 'manual'} ; author = préfixe utilisateur.
PatchCenter v2 (avec heure) testé en premier pour ne pas matcher v1.
'manual' = fallback : premier token (avant espace ou underscore) = auteur."""
n = name or ""
m = SNAP_PATCHCENTER_V2_RE.match(n)
if m:
return "patchcenter", m.group("author")
m = SNAP_PATCHCENTER_V1_RE.match(n)
if m:
return "patchcenter", m.group("author")
m = SNAP_SLPM_RE.match(n)
if m:
return "slpm", m.group("author")
m = SNAP_MANUAL_RE.match(n)
if m:
return "manual", m.group("author")
return None, None
def _get_vcenter_creds(db):
from .secrets_service import get_secret
user = (get_secret(db, "vcenter_user") or get_secret(db, "vsphere_user") or "").strip()
pwd = (get_secret(db, "vcenter_pass") or get_secret(db, "vsphere_pass") or "").strip()
return user, pwd
def _connect(endpoint, user, password):
try:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return SmartConnect(host=endpoint, user=user, pwd=password, sslContext=ctx)
except Exception as e:
log.warning(f"Connexion vCenter {endpoint} échouée: {e}")
return None
def _walk_snapshots(snapshot_list, vm, vcenter_name, vcenter_id, vm_moid, parent_path=""):
"""Walk récursif de l'arbre des snapshots d'une VM. Yield des dicts."""
for s in snapshot_list:
snap = s.snapshot # ManagedObject (vim.vm.Snapshot)
snap_moid = snap._moId # string ex 'snapshot-1234'
name = s.name or ""
path = f"{parent_path}/{name}" if parent_path else name
# Création (createTime peut être absent dans certaines versions)
created = getattr(s, "createTime", None)
if created is None:
created_iso = None
age_days = None
else:
try:
if created.tzinfo is None:
created = created.replace(tzinfo=timezone.utc)
created_iso = created.isoformat()
age_days = (datetime.now(timezone.utc) - created).total_seconds() / 86400.0
except Exception:
created_iso = str(created)
age_days = None
# Détection du format géré (PatchCenter ou .exe SLPM)
origin, author = _detect_snap_origin(name)
is_managed = origin is not None
yield {
"vcenter_id": vcenter_id,
"vcenter_name": vcenter_name,
"vm_name": vm.name,
"vm_moid": vm_moid,
"snap_id": snap_moid,
"snap_name": name,
"snap_path": path,
"description": s.description or "",
"created_at": created_iso,
"age_days": round(age_days, 2) if age_days is not None else None,
"author": author,
"origin": origin, # 'patchcenter' | 'slpm' | 'manual' | None
"is_managed_format": is_managed, # any des 2 formats SecOps
"is_patchcenter_format": origin == "patchcenter",
"is_current": bool(getattr(s, "id", None) and vm.snapshot and vm.snapshot.currentSnapshot
and vm.snapshot.currentSnapshot._moId == snap_moid),
}
# Récurse enfants
if s.childSnapshotList:
yield from _walk_snapshots(s.childSnapshotList, vm, vcenter_name, vcenter_id,
vm_moid, parent_path=path)
def list_snapshots(db, vcenter_filter_id: int = None) -> Dict[str, Any]:
"""Itère les vCenters actifs et retourne tous les snapshots existants.
Si `vcenter_filter_id` est fourni, ne scanne que ce vCenter.
Retourne {ok, snapshots: list, errors: list}."""
if not PYVMOMI_OK:
return {"ok": False, "msg": "pyvmomi non disponible côté serveur PatchCenter",
"snapshots": [], "errors": []}
from sqlalchemy import text as sqlt
user, pwd = _get_vcenter_creds(db)
if not user or not pwd:
return {"ok": False, "msg": "Credentials vCenter manquants (Settings > vSphere)",
"snapshots": [], "errors": []}
where = "WHERE is_active = true"
params = {}
if vcenter_filter_id:
where += " AND id = :id"
params["id"] = vcenter_filter_id
vcs = db.execute(sqlt(f"SELECT id, name, endpoint FROM vcenters {where} ORDER BY name"),
params).fetchall()
if not vcs:
return {"ok": False, "msg": "Aucun vCenter actif", "snapshots": [], "errors": []}
all_snaps = []
errors = []
for vc in vcs:
si = _connect(vc.endpoint, user, pwd)
if not si:
errors.append({"vcenter": vc.name, "msg": "Connexion KO"})
continue
try:
content = si.RetrieveContent()
container = content.viewManager.CreateContainerView(
content.rootFolder, [vim.VirtualMachine], True)
try:
for vm in container.view:
try:
if not vm.snapshot or not vm.snapshot.rootSnapshotList:
continue
for snap in _walk_snapshots(vm.snapshot.rootSnapshotList,
vm, vc.name, vc.id, vm._moId):
all_snaps.append(snap)
except Exception as e:
# VM inaccessible / template / config invalide → ignore
log.debug(f"VM scan err on {vc.name}: {e}")
finally:
container.Destroy()
except Exception as e:
errors.append({"vcenter": vc.name, "msg": str(e)[:200]})
finally:
try:
Disconnect(si)
except Exception:
pass
# Tri : plus ancien en haut
all_snaps.sort(key=lambda s: (s.get("age_days") or 0), reverse=True)
return {"ok": True, "snapshots": all_snaps, "errors": errors,
"vcenter_count": len(vcs), "snap_count": len(all_snaps)}
def delete_snapshot(db, vcenter_id: int, vm_moid: str, snap_id: str,
remove_children: bool = False) -> Dict[str, Any]:
"""Supprime un snapshot identifié par (vcenter_id, vm_moid, snap_id).
remove_children=True supprime aussi les snapshots enfants."""
if not PYVMOMI_OK:
return {"ok": False, "msg": "pyvmomi non disponible"}
from sqlalchemy import text as sqlt
vc = db.execute(sqlt("SELECT id, name, endpoint FROM vcenters WHERE id=:id"),
{"id": vcenter_id}).fetchone()
if not vc:
return {"ok": False, "msg": f"vCenter id={vcenter_id} introuvable"}
user, pwd = _get_vcenter_creds(db)
if not user or not pwd:
return {"ok": False, "msg": "Credentials vCenter manquants (Settings > vSphere)"}
si = _connect(vc.endpoint, user, pwd)
if not si:
return {"ok": False, "msg": f"Connexion vCenter {vc.name} KO"}
try:
# On reconstruit le ManagedObject directement par ses moRef
snap_mo = vim.vm.Snapshot(snap_id, si._stub)
task = snap_mo.RemoveSnapshot_Task(removeChildren=remove_children)
# Wait task
from pyVmomi import vim as _vim
while task.info.state in (_vim.TaskInfo.State.queued, _vim.TaskInfo.State.running):
import time as _t
_t.sleep(1)
if task.info.state == _vim.TaskInfo.State.success:
return {"ok": True, "msg": f"Snapshot supprimé sur {vc.name}",
"vcenter": vc.name, "vm_moid": vm_moid, "snap_id": snap_id}
else:
err = task.info.error
err_msg = err.msg if err else "task failed"
return {"ok": False, "msg": f"Échec suppression: {err_msg}",
"vcenter": vc.name}
except Exception as e:
return {"ok": False, "msg": f"Exception: {type(e).__name__}: {e}",
"vcenter": vc.name}
finally:
try:
Disconnect(si)
except Exception:
pass