"""Service de gestion des snapshots VM existants — listing + suppression. - list_snapshots(db) : itère les vCenters actifs et retourne tous les snapshots avec leurs métadonnées (VM, nom snap, créé le, taille, description, créateur). - delete_snapshot(db, vcenter_id, vm_moid, snapshot_id) : supprime un snapshot spécifique par son moRef. Identification de l'auteur : on parse le préfixe du nom du snapshot (format PatchCenter : `_YYYY-MM-DD_avant_patch`). """ import re import ssl import logging from datetime import datetime, timezone from typing import Dict, Any, List log = logging.getLogger("patchcenter.snapshot_mgmt") try: from pyVim.connect import SmartConnect, Disconnect from pyVmomi import vim PYVMOMI_OK = True except ImportError: PYVMOMI_OK = False log.warning("pyvmomi non disponible — listing snapshots impossible") # Formats gérés par les outils SecOps SANEF : # PatchCenter v2 : `_YYYY-MM-DD_HH-MM_avant_patch` (user = login JWT, depuis 2026-05-07) # PatchCenter v1 : `_YYYY-MM-DD_avant_patch` (legacy, basé sur intervenant) # .exe SLPM : `SLPM__YYYYMMDD_HHMM` SNAP_PATCHCENTER_V2_RE = re.compile( r"^(?P[A-Za-z0-9_\-\.]+)_(?P\d{4}-\d{2}-\d{2})_\d{2}-\d{2}_avant_patch$" ) SNAP_PATCHCENTER_V1_RE = re.compile( r"^(?P[A-Za-z0-9_\-\.]+)_(?P\d{4}-\d{2}-\d{2})_avant_patch$" ) SNAP_SLPM_RE = re.compile( r"^SLPM_(?P[A-Za-z0-9_\-\.]+)_\d{8}_\d{4}$" ) def _detect_snap_origin(name: str): """Renvoie (origin, author) ou (None, None) si format inconnu. origin in {'patchcenter', 'slpm'} ; author = préfixe utilisateur. PatchCenter v2 (avec heure) testé en premier pour ne pas matcher v1.""" n = name or "" m = SNAP_PATCHCENTER_V2_RE.match(n) if m: return "patchcenter", m.group("author") m = SNAP_PATCHCENTER_V1_RE.match(n) if m: return "patchcenter", m.group("author") m = SNAP_SLPM_RE.match(n) if m: return "slpm", m.group("author") return None, None def _get_vcenter_creds(db): from .secrets_service import get_secret user = (get_secret(db, "vcenter_user") or get_secret(db, "vsphere_user") or "").strip() pwd = (get_secret(db, "vcenter_pass") or get_secret(db, "vsphere_pass") or "").strip() return user, pwd def _connect(endpoint, user, password): try: ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return SmartConnect(host=endpoint, user=user, pwd=password, sslContext=ctx) except Exception as e: log.warning(f"Connexion vCenter {endpoint} échouée: {e}") return None def _walk_snapshots(snapshot_list, vm, vcenter_name, vcenter_id, vm_moid, parent_path=""): """Walk récursif de l'arbre des snapshots d'une VM. Yield des dicts.""" for s in snapshot_list: snap = s.snapshot # ManagedObject (vim.vm.Snapshot) snap_moid = snap._moId # string ex 'snapshot-1234' name = s.name or "" path = f"{parent_path}/{name}" if parent_path else name # Création (createTime peut être absent dans certaines versions) created = getattr(s, "createTime", None) if created is None: created_iso = None age_days = None else: try: if created.tzinfo is None: created = created.replace(tzinfo=timezone.utc) created_iso = created.isoformat() age_days = (datetime.now(timezone.utc) - created).total_seconds() / 86400.0 except Exception: created_iso = str(created) age_days = None # Détection du format géré (PatchCenter ou .exe SLPM) origin, author = _detect_snap_origin(name) is_managed = origin is not None yield { "vcenter_id": vcenter_id, "vcenter_name": vcenter_name, "vm_name": vm.name, "vm_moid": vm_moid, "snap_id": snap_moid, "snap_name": name, "snap_path": path, "description": s.description or "", "created_at": created_iso, "age_days": round(age_days, 2) if age_days is not None else None, "author": author, "origin": origin, # 'patchcenter' | 'slpm' | None "is_managed_format": is_managed, # any des 2 formats SecOps "is_patchcenter_format": origin == "patchcenter", "is_current": bool(getattr(s, "id", None) and vm.snapshot and vm.snapshot.currentSnapshot and vm.snapshot.currentSnapshot._moId == snap_moid), } # Récurse enfants if s.childSnapshotList: yield from _walk_snapshots(s.childSnapshotList, vm, vcenter_name, vcenter_id, vm_moid, parent_path=path) def list_snapshots(db, vcenter_filter_id: int = None) -> Dict[str, Any]: """Itère les vCenters actifs et retourne tous les snapshots existants. Si `vcenter_filter_id` est fourni, ne scanne que ce vCenter. Retourne {ok, snapshots: list, errors: list}.""" if not PYVMOMI_OK: return {"ok": False, "msg": "pyvmomi non disponible côté serveur PatchCenter", "snapshots": [], "errors": []} from sqlalchemy import text as sqlt user, pwd = _get_vcenter_creds(db) if not user or not pwd: return {"ok": False, "msg": "Credentials vCenter manquants (Settings > vSphere)", "snapshots": [], "errors": []} where = "WHERE is_active = true" params = {} if vcenter_filter_id: where += " AND id = :id" params["id"] = vcenter_filter_id vcs = db.execute(sqlt(f"SELECT id, name, endpoint FROM vcenters {where} ORDER BY name"), params).fetchall() if not vcs: return {"ok": False, "msg": "Aucun vCenter actif", "snapshots": [], "errors": []} all_snaps = [] errors = [] for vc in vcs: si = _connect(vc.endpoint, user, pwd) if not si: errors.append({"vcenter": vc.name, "msg": "Connexion KO"}) continue try: content = si.RetrieveContent() container = content.viewManager.CreateContainerView( content.rootFolder, [vim.VirtualMachine], True) try: for vm in container.view: try: if not vm.snapshot or not vm.snapshot.rootSnapshotList: continue for snap in _walk_snapshots(vm.snapshot.rootSnapshotList, vm, vc.name, vc.id, vm._moId): all_snaps.append(snap) except Exception as e: # VM inaccessible / template / config invalide → ignore log.debug(f"VM scan err on {vc.name}: {e}") finally: container.Destroy() except Exception as e: errors.append({"vcenter": vc.name, "msg": str(e)[:200]}) finally: try: Disconnect(si) except Exception: pass # Tri : plus ancien en haut all_snaps.sort(key=lambda s: (s.get("age_days") or 0), reverse=True) return {"ok": True, "snapshots": all_snaps, "errors": errors, "vcenter_count": len(vcs), "snap_count": len(all_snaps)} def delete_snapshot(db, vcenter_id: int, vm_moid: str, snap_id: str, remove_children: bool = False) -> Dict[str, Any]: """Supprime un snapshot identifié par (vcenter_id, vm_moid, snap_id). remove_children=True supprime aussi les snapshots enfants.""" if not PYVMOMI_OK: return {"ok": False, "msg": "pyvmomi non disponible"} from sqlalchemy import text as sqlt vc = db.execute(sqlt("SELECT id, name, endpoint FROM vcenters WHERE id=:id"), {"id": vcenter_id}).fetchone() if not vc: return {"ok": False, "msg": f"vCenter id={vcenter_id} introuvable"} user, pwd = _get_vcenter_creds(db) if not user or not pwd: return {"ok": False, "msg": "Credentials vCenter manquants (Settings > vSphere)"} si = _connect(vc.endpoint, user, pwd) if not si: return {"ok": False, "msg": f"Connexion vCenter {vc.name} KO"} try: # On reconstruit le ManagedObject directement par ses moRef snap_mo = vim.vm.Snapshot(snap_id, si._stub) task = snap_mo.RemoveSnapshot_Task(removeChildren=remove_children) # Wait task from pyVmomi import vim as _vim while task.info.state in (_vim.TaskInfo.State.queued, _vim.TaskInfo.State.running): import time as _t _t.sleep(1) if task.info.state == _vim.TaskInfo.State.success: return {"ok": True, "msg": f"Snapshot supprimé sur {vc.name}", "vcenter": vc.name, "vm_moid": vm_moid, "snap_id": snap_id} else: err = task.info.error err_msg = err.msg if err else "task failed" return {"ok": False, "msg": f"Échec suppression: {err_msg}", "vcenter": vc.name} except Exception as e: return {"ok": False, "msg": f"Exception: {type(e).__name__}: {e}", "vcenter": vc.name} finally: try: Disconnect(si) except Exception: pass