diff --git a/app/main.py b/app/main.py index 9c539b6..f28f05a 100644 --- a/app/main.py +++ b/app/main.py @@ -12,7 +12,7 @@ from starlette.middleware.base import BaseHTTPMiddleware from .config import APP_NAME, APP_VERSION from .dependencies import get_current_user, get_user_perms from .database import SessionLocal, SessionLocalDemo -from .routers import auth, dashboard, servers, settings, users, campaigns, planning, planning_import, specifics, audit, contacts, qualys, qualys_tags, quickwin, referentiel, patching, applications, patch_history, duty +from .routers import auth, dashboard, servers, settings, users, campaigns, planning, planning_import, specifics, audit, contacts, qualys, qualys_tags, quickwin, referentiel, patching, applications, patch_history, duty, snapshots class PermissionsMiddleware(BaseHTTPMiddleware): @@ -74,6 +74,7 @@ app.include_router(patching.router) app.include_router(patch_history.router) app.include_router(duty.router) app.include_router(applications.router) +app.include_router(snapshots.router) @app.get("/") diff --git a/app/routers/snapshots.py b/app/routers/snapshots.py new file mode 100644 index 0000000..3720787 --- /dev/null +++ b/app/routers/snapshots.py @@ -0,0 +1,121 @@ +"""Router /snapshots — listing + suppression des snapshots VM existants sur les vCenters.""" +import logging +from fastapi import APIRouter, Request, Depends, Form +from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse +from fastapi.templating import Jinja2Templates +from sqlalchemy import text + +from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, base_context +from ..config import APP_NAME + +log = logging.getLogger("patchcenter.snapshots_router") + +router = APIRouter() +templates = Jinja2Templates(directory="app/templates") + + +def _can_view_snaps(perms): + return can_view(perms, "campaigns") or can_view(perms, "planning") or can_view(perms, "settings") + + +def _can_delete_snaps(perms): + return can_edit(perms, "campaigns") or can_edit(perms, "planning") or can_edit(perms, "settings") + + +def _get_user_intervenant_name(db, user): + """Retourne le nom utilisé comme préfixe dans les snapshots PatchCenter de cet utilisateur. + Priorité : settings 'patcher' > username.""" + try: + from ..services.secrets_service import get_secret + patcher = (get_secret(db, "patcher") or "").strip() + if patcher: + return patcher + except Exception: + pass + return user.get("username", "") + + +@router.get("/snapshots", response_class=HTMLResponse) +async def snapshots_page(request: Request, db=Depends(get_db)): + user = get_current_user(request) + if not user: + return RedirectResponse(url="/login") + perms = get_user_perms(db, user) + if not _can_view_snaps(perms): + return RedirectResponse(url="/dashboard") + + intervenant = _get_user_intervenant_name(db, user) + vcenters = db.execute(text( + "SELECT id, name, endpoint FROM vcenters WHERE is_active = true ORDER BY name" + )).fetchall() + + ctx = base_context(request, db, user) + ctx.update({ + "app_name": APP_NAME, + "intervenant_default": intervenant, + "vcenters": vcenters, + "can_delete": _can_delete_snaps(perms), + }) + return templates.TemplateResponse("snapshots.html", ctx) + + +@router.post("/snapshots/list") +async def snapshots_list(request: Request, db=Depends(get_db), + vcenter_id: str = Form("")): + """Endpoint AJAX : liste les snapshots (tous vCenters ou un seul).""" + user = get_current_user(request) + if not user: + return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401) + perms = get_user_perms(db, user) + if not _can_view_snaps(perms): + return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403) + + from ..services.snapshot_mgmt_service import list_snapshots + vc_id = int(vcenter_id) if vcenter_id and vcenter_id.isdigit() else None + res = list_snapshots(db, vcenter_filter_id=vc_id) + return JSONResponse(res) + + +@router.post("/snapshots/delete") +async def snapshots_delete(request: Request, db=Depends(get_db)): + """Endpoint AJAX : supprime une liste de snapshots. + Body JSON : [{vcenter_id, vm_moid, snap_id}, ...]""" + user = get_current_user(request) + if not user: + return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401) + perms = get_user_perms(db, user) + if not _can_delete_snaps(perms): + return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403) + + try: + body = await request.json() + except Exception: + return JSONResponse({"ok": False, "msg": "Body JSON invalide"}, status_code=400) + items = body.get("items") if isinstance(body, dict) else None + if not isinstance(items, list) or not items: + return JSONResponse({"ok": False, "msg": "Aucun snapshot ciblé"}, status_code=400) + + from ..services.snapshot_mgmt_service import delete_snapshot + results = [] + n_ok = 0 + for it in items: + try: + r = delete_snapshot( + db, + vcenter_id=int(it["vcenter_id"]), + vm_moid=str(it["vm_moid"]), + snap_id=str(it["snap_id"]), + remove_children=bool(it.get("remove_children", False)), + ) + except Exception as e: + r = {"ok": False, "msg": f"Param error: {e}"} + r["vm_name"] = it.get("vm_name", "?") + r["snap_name"] = it.get("snap_name", "?") + results.append(r) + if r.get("ok"): + n_ok += 1 + return JSONResponse({ + "ok": n_ok == len(items), + "summary": f"{n_ok}/{len(items)} snapshots supprimés", + "results": results, + }) diff --git a/app/services/snapshot_mgmt_service.py b/app/services/snapshot_mgmt_service.py new file mode 100644 index 0000000..e3ec435 --- /dev/null +++ b/app/services/snapshot_mgmt_service.py @@ -0,0 +1,209 @@ +"""Service de gestion des snapshots VM existants — listing + suppression. + +- list_snapshots(db) : itère les vCenters actifs et retourne tous les snapshots + avec leurs métadonnées (VM, nom snap, créé le, taille, description, créateur). +- delete_snapshot(db, vcenter_id, vm_moid, snapshot_id) : supprime un snapshot + spécifique par son moRef. + +Identification de l'auteur : on parse le préfixe du nom du snapshot +(format PatchCenter : `_YYYY-MM-DD_avant_patch`). +""" +import re +import ssl +import logging +from datetime import datetime, timezone +from typing import Dict, Any, List + +log = logging.getLogger("patchcenter.snapshot_mgmt") + +try: + from pyVim.connect import SmartConnect, Disconnect + from pyVmomi import vim + PYVMOMI_OK = True +except ImportError: + PYVMOMI_OK = False + log.warning("pyvmomi non disponible — listing snapshots impossible") + + +SNAP_NAME_RE = re.compile( + r"^(?P[A-Za-z0-9_\-\.]+)_(?P\d{4}-\d{2}-\d{2})(?:_(?P.+))?$" +) + + +def _get_vcenter_creds(db): + from .secrets_service import get_secret + user = (get_secret(db, "vcenter_user") or get_secret(db, "vsphere_user") or "").strip() + pwd = (get_secret(db, "vcenter_pass") or get_secret(db, "vsphere_pass") or "").strip() + return user, pwd + + +def _connect(endpoint, user, password): + try: + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + return SmartConnect(host=endpoint, user=user, pwd=password, sslContext=ctx) + except Exception as e: + log.warning(f"Connexion vCenter {endpoint} échouée: {e}") + return None + + +def _walk_snapshots(snapshot_list, vm, vcenter_name, vcenter_id, vm_moid, parent_path=""): + """Walk récursif de l'arbre des snapshots d'une VM. Yield des dicts.""" + for s in snapshot_list: + snap = s.snapshot # ManagedObject (vim.vm.Snapshot) + snap_moid = snap._moId # string ex 'snapshot-1234' + name = s.name or "" + path = f"{parent_path}/{name}" if parent_path else name + + # Création (createTime peut être absent dans certaines versions) + created = getattr(s, "createTime", None) + if created is None: + created_iso = None + age_days = None + else: + try: + if created.tzinfo is None: + created = created.replace(tzinfo=timezone.utc) + created_iso = created.isoformat() + age_days = (datetime.now(timezone.utc) - created).total_seconds() / 86400.0 + except Exception: + created_iso = str(created) + age_days = None + + # Auteur déduit du préfixe du nom (format PatchCenter) + author = None + m = SNAP_NAME_RE.match(name) + if m: + author = m.group("author") + + yield { + "vcenter_id": vcenter_id, + "vcenter_name": vcenter_name, + "vm_name": vm.name, + "vm_moid": vm_moid, + "snap_id": snap_moid, + "snap_name": name, + "snap_path": path, + "description": s.description or "", + "created_at": created_iso, + "age_days": round(age_days, 2) if age_days is not None else None, + "author": author, + "is_current": bool(getattr(s, "id", None) and vm.snapshot and vm.snapshot.currentSnapshot + and vm.snapshot.currentSnapshot._moId == snap_moid), + } + + # Récurse enfants + if s.childSnapshotList: + yield from _walk_snapshots(s.childSnapshotList, vm, vcenter_name, vcenter_id, + vm_moid, parent_path=path) + + +def list_snapshots(db, vcenter_filter_id: int = None) -> Dict[str, Any]: + """Itère les vCenters actifs et retourne tous les snapshots existants. + + Si `vcenter_filter_id` est fourni, ne scanne que ce vCenter. + Retourne {ok, snapshots: list, errors: list}.""" + if not PYVMOMI_OK: + return {"ok": False, "msg": "pyvmomi non disponible côté serveur PatchCenter", + "snapshots": [], "errors": []} + + from sqlalchemy import text as sqlt + user, pwd = _get_vcenter_creds(db) + if not user or not pwd: + return {"ok": False, "msg": "Credentials vCenter manquants (Settings > vSphere)", + "snapshots": [], "errors": []} + + where = "WHERE is_active = true" + params = {} + if vcenter_filter_id: + where += " AND id = :id" + params["id"] = vcenter_filter_id + vcs = db.execute(sqlt(f"SELECT id, name, endpoint FROM vcenters {where} ORDER BY name"), + params).fetchall() + if not vcs: + return {"ok": False, "msg": "Aucun vCenter actif", "snapshots": [], "errors": []} + + all_snaps = [] + errors = [] + + for vc in vcs: + si = _connect(vc.endpoint, user, pwd) + if not si: + errors.append({"vcenter": vc.name, "msg": "Connexion KO"}) + continue + try: + content = si.RetrieveContent() + container = content.viewManager.CreateContainerView( + content.rootFolder, [vim.VirtualMachine], True) + try: + for vm in container.view: + try: + if not vm.snapshot or not vm.snapshot.rootSnapshotList: + continue + for snap in _walk_snapshots(vm.snapshot.rootSnapshotList, + vm, vc.name, vc.id, vm._moId): + all_snaps.append(snap) + except Exception as e: + # VM inaccessible / template / config invalide → ignore + log.debug(f"VM scan err on {vc.name}: {e}") + finally: + container.Destroy() + except Exception as e: + errors.append({"vcenter": vc.name, "msg": str(e)[:200]}) + finally: + try: + Disconnect(si) + except Exception: + pass + + # Tri : plus ancien en haut + all_snaps.sort(key=lambda s: (s.get("age_days") or 0), reverse=True) + return {"ok": True, "snapshots": all_snaps, "errors": errors, + "vcenter_count": len(vcs), "snap_count": len(all_snaps)} + + +def delete_snapshot(db, vcenter_id: int, vm_moid: str, snap_id: str, + remove_children: bool = False) -> Dict[str, Any]: + """Supprime un snapshot identifié par (vcenter_id, vm_moid, snap_id). + remove_children=True supprime aussi les snapshots enfants.""" + if not PYVMOMI_OK: + return {"ok": False, "msg": "pyvmomi non disponible"} + + from sqlalchemy import text as sqlt + vc = db.execute(sqlt("SELECT id, name, endpoint FROM vcenters WHERE id=:id"), + {"id": vcenter_id}).fetchone() + if not vc: + return {"ok": False, "msg": f"vCenter id={vcenter_id} introuvable"} + user, pwd = _get_vcenter_creds(db) + if not user or not pwd: + return {"ok": False, "msg": "Credentials vCenter manquants (Settings > vSphere)"} + + si = _connect(vc.endpoint, user, pwd) + if not si: + return {"ok": False, "msg": f"Connexion vCenter {vc.name} KO"} + try: + # On reconstruit le ManagedObject directement par ses moRef + snap_mo = vim.vm.Snapshot(snap_id, si._stub) + task = snap_mo.RemoveSnapshot_Task(removeChildren=remove_children) + # Wait task + from pyVmomi import vim as _vim + while task.info.state in (_vim.TaskInfo.State.queued, _vim.TaskInfo.State.running): + import time as _t + _t.sleep(1) + if task.info.state == _vim.TaskInfo.State.success: + return {"ok": True, "msg": f"Snapshot supprimé sur {vc.name}", + "vcenter": vc.name, "vm_moid": vm_moid, "snap_id": snap_id} + else: + err = task.info.error + err_msg = err.msg if err else "task failed" + return {"ok": False, "msg": f"Échec suppression: {err_msg}", + "vcenter": vc.name} + except Exception as e: + return {"ok": False, "msg": f"Exception: {type(e).__name__}: {e}", + "vcenter": vc.name} + finally: + try: + Disconnect(si) + except Exception: + pass diff --git a/app/templates/base.html b/app/templates/base.html index 2facf18..60589fd 100644 --- a/app/templates/base.html +++ b/app/templates/base.html @@ -94,6 +94,7 @@ {% if p.servers in ('edit','admin') or p.campaigns in ('edit','admin') or p.quickwin in ('edit','admin') %}Config exclusions{% endif %} {% if p.campaigns in ('edit','admin') or p.quickwin in ('edit','admin') %}Validations{% endif %} Historique + {% if p.campaigns or p.planning or p.settings %}📸 Snapshots VM{% endif %} Tour de garde {# Quickwin sous-groupe #} diff --git a/app/templates/snapshots.html b/app/templates/snapshots.html new file mode 100644 index 0000000..ff02df3 --- /dev/null +++ b/app/templates/snapshots.html @@ -0,0 +1,237 @@ +{% extends 'base.html' %} +{% block title %}Snapshots VM{% endblock %} +{% block content %} +
+
+

Gestion des snapshots VM

+

+ Liste les snapshots existants sur les vCenters configurés. Filtre par auteur (préfixe nom = intervenant) et âge. + La suppression est définitive — confirmation requise. +

+
+
+ + + +
+
+
+ + +
+
+ + + +
+
+ + +
+
+ + {% if can_delete %} + + {% endif %} +
+
+
+ +
Cliquer "Charger" pour scanner les vCenters.
+ +
+ + + + + + + + + + + + + + +
vCenterVMSnapshotAuteurCréé leÂgeDescription
+
+ + +{% endblock %}