"""Router Patching — exclusions, correspondance prod↔hors-prod, validations.""" from fastapi import APIRouter, Request, Depends, Query, Form from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse from fastapi.templating import Jinja2Templates from sqlalchemy import text from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, base_context from ..services import correspondance_service as corr from ..config import APP_NAME router = APIRouter() templates = Jinja2Templates(directory="app/templates") def _can_edit_excludes(perms): """Peut éditer les exclusions : admin, coordinator, operator (pas viewer).""" return can_edit(perms, "servers") or can_edit(perms, "campaigns") or can_edit(perms, "quickwin") @router.get("/patching/config-exclusions", response_class=HTMLResponse) async def config_exclusions_page(request: Request, db=Depends(get_db), search: str = Query(""), domain: str = Query(""), env: str = Query(""), zone: str = Query(""), tier: str = Query(""), os: str = Query(""), application: str = Query(""), has_excludes: str = Query(""), page: int = Query(1), per_page: int = Query(30)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not _can_edit_excludes(perms): return RedirectResponse(url="/dashboard") # Requête principale where = ["1=1"] params = {} if search: where.append("s.hostname ILIKE :s"); params["s"] = f"%{search}%" if domain: where.append("d.code = :d"); params["d"] = domain if env: where.append("e.code = :e"); params["e"] = env if zone: where.append("z.name = :z"); params["z"] = zone if tier: where.append("s.tier = :t"); params["t"] = tier if os: where.append("s.os_family = :o"); params["o"] = os if application: where.append("s.application_name = :app"); params["app"] = application if has_excludes == "yes": where.append("s.patch_excludes IS NOT NULL AND s.patch_excludes != ''") elif has_excludes == "no": where.append("(s.patch_excludes IS NULL OR s.patch_excludes = '')") wc = " AND ".join(where) # Count total = db.execute(text(f""" SELECT COUNT(*) FROM servers s LEFT JOIN domain_environments de ON s.domain_env_id = de.id LEFT JOIN domains d ON de.domain_id = d.id LEFT JOIN environments e ON de.environment_id = e.id LEFT JOIN zones z ON s.zone_id = z.id WHERE {wc} """), params).scalar() or 0 per_page = max(10, min(per_page, 200)) total_pages = max(1, (total + per_page - 1) // per_page) page = max(1, min(page, total_pages)) offset = (page - 1) * per_page rows = db.execute(text(f""" SELECT s.id, s.hostname, s.os_family, s.os_version, s.tier, s.etat, s.patch_excludes, s.application_name, d.name as domain_name, d.code as domain_code, e.name as env_name, e.code as env_code, z.name as zone_name FROM servers s LEFT JOIN domain_environments de ON s.domain_env_id = de.id LEFT JOIN domains d ON de.domain_id = d.id LEFT JOIN environments e ON de.environment_id = e.id LEFT JOIN zones z ON s.zone_id = z.id WHERE {wc} ORDER BY s.hostname LIMIT :limit OFFSET :offset """), {**params, "limit": per_page, "offset": offset}).fetchall() # Listes pour filtres domains = db.execute(text("SELECT code, name FROM domains ORDER BY name")).fetchall() envs = db.execute(text("SELECT code, name FROM environments ORDER BY name")).fetchall() zones = db.execute(text("SELECT DISTINCT name FROM zones ORDER BY name")).fetchall() applications = db.execute(text("""SELECT application_name, COUNT(*) as c FROM servers WHERE application_name IS NOT NULL AND application_name != '' GROUP BY application_name ORDER BY application_name""")).fetchall() all_apps = db.execute(text("""SELECT id, nom_court FROM applications WHERE itop_id IS NOT NULL ORDER BY nom_court""")).fetchall() # Stats globales stats = { "total_servers": db.execute(text("SELECT COUNT(*) FROM servers")).scalar(), "with_excludes": db.execute(text("SELECT COUNT(*) FROM servers WHERE patch_excludes IS NOT NULL AND patch_excludes != ''")).scalar(), } ctx = base_context(request, db, user) ctx.update({ "app_name": APP_NAME, "servers": rows, "total": total, "page": page, "per_page": per_page, "total_pages": total_pages, "filters": {"search": search, "domain": domain, "env": env, "zone": zone, "tier": tier, "os": os, "application": application, "has_excludes": has_excludes}, "domains": domains, "envs": envs, "zones": [z.name for z in zones], "applications": applications, "all_apps": all_apps, "stats": stats, "msg": request.query_params.get("msg"), }) return templates.TemplateResponse("patching_config_exclusions.html", ctx) @router.post("/patching/config-exclusions/{server_id}/save") async def save_server_excludes(request: Request, server_id: int, db=Depends(get_db), patch_excludes: str = Form("")): """Enregistre les exclusions d'un serveur + push iTop.""" from fastapi.responses import JSONResponse user = get_current_user(request) if not user: return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401) perms = get_user_perms(db, user) if not _can_edit_excludes(perms): return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403) # Normalise : split sur espaces/newlines, dédoublonne, rejoint avec un espace parts = [p.strip() for p in patch_excludes.replace("\n", " ").replace("\t", " ").split() if p.strip()] seen = set() cleaned = [] for p in parts: if p not in seen: seen.add(p) cleaned.append(p) new_val = " ".join(cleaned) srv = db.execute(text("SELECT id, hostname FROM servers WHERE id=:id"), {"id": server_id}).fetchone() if not srv: return JSONResponse({"ok": False, "msg": "Serveur introuvable"}, status_code=404) # 1. Maj base locale db.execute(text("UPDATE servers SET patch_excludes=:pe, updated_at=NOW() WHERE id=:id"), {"pe": new_val, "id": server_id}) db.commit() # 2. Push iTop immédiat (best effort) itop_result = {"pushed": False, "msg": ""} try: from ..services.itop_service import ITopClient from ..services.secrets_service import get_secret url = get_secret(db, "itop_url") u = get_secret(db, "itop_user") p = get_secret(db, "itop_pass") if url and u and p: client = ITopClient(url, u, p) r = client._call("core/get", **{"class": "VirtualMachine", "key": f'SELECT VirtualMachine WHERE name = "{srv.hostname}"', "output_fields": "name"}) if r.get("objects"): vm_id = list(r["objects"].values())[0]["key"] upd = client.update("VirtualMachine", vm_id, {"patch_excludes": new_val}) if upd.get("code") == 0: itop_result = {"pushed": True, "msg": "Poussé vers iTop"} else: itop_result = {"pushed": False, "msg": f"iTop: {upd.get('message','')[:80]}"} except Exception as e: itop_result = {"pushed": False, "msg": f"iTop error: {str(e)[:80]}"} return JSONResponse({"ok": True, "patch_excludes": new_val, "itop": itop_result}) @router.post("/patching/config-exclusions/bulk") async def bulk_update_excludes(request: Request, db=Depends(get_db)): """Bulk add/remove pattern sur plusieurs serveurs.""" from fastapi.responses import JSONResponse user = get_current_user(request) if not user: return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401) perms = get_user_perms(db, user) if not _can_edit_excludes(perms): return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403) body = await request.json() server_ids = body.get("server_ids", []) pattern = (body.get("pattern") or "").strip() action = body.get("action", "add") # "add" | "remove" | "replace" if not server_ids or not pattern and action != "replace": return JSONResponse({"ok": False, "msg": "Paramètres manquants"}) ids = [int(x) for x in server_ids if str(x).isdigit()] if not ids: return JSONResponse({"ok": False, "msg": "Aucun serveur valide"}) placeholders = ",".join(str(i) for i in ids) rows = db.execute(text(f"SELECT id, hostname, patch_excludes FROM servers WHERE id IN ({placeholders})")).fetchall() updated = 0 for r in rows: current = (r.patch_excludes or "").strip() parts = current.split() if current else [] if action == "add": if pattern not in parts: parts.append(pattern) elif action == "remove": parts = [p for p in parts if p != pattern] elif action == "replace": parts = pattern.split() new_val = " ".join(parts) if new_val != current: db.execute(text("UPDATE servers SET patch_excludes=:pe, updated_at=NOW() WHERE id=:id"), {"pe": new_val, "id": r.id}) updated += 1 db.commit() # Push iTop en batch (best effort, async conceptually) itop_pushed = 0 itop_errors = 0 try: from ..services.itop_service import ITopClient from ..services.secrets_service import get_secret url = get_secret(db, "itop_url") u = get_secret(db, "itop_user") p = get_secret(db, "itop_pass") if url and u and p: client = ITopClient(url, u, p) # Refresh après update rows2 = db.execute(text(f"SELECT hostname, patch_excludes FROM servers WHERE id IN ({placeholders})")).fetchall() for r in rows2: resp = client._call("core/get", **{"class": "VirtualMachine", "key": f'SELECT VirtualMachine WHERE name = "{r.hostname}"', "output_fields": "name"}) if resp.get("objects"): vm_id = list(resp["objects"].values())[0]["key"] up = client.update("VirtualMachine", vm_id, {"patch_excludes": r.patch_excludes or ""}) if up.get("code") == 0: itop_pushed += 1 else: itop_errors += 1 except Exception: pass return JSONResponse({"ok": True, "updated": updated, "itop_pushed": itop_pushed, "itop_errors": itop_errors}) @router.post("/patching/config-exclusions/bulk-application") async def bulk_update_application(request: Request, db=Depends(get_db)): """Bulk changement de solution applicative sur plusieurs serveurs.""" from fastapi.responses import JSONResponse user = get_current_user(request) if not user: return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401) perms = get_user_perms(db, user) if not _can_edit_excludes(perms): return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403) body = await request.json() server_ids = body.get("server_ids", []) application_id = body.get("application_id") # int ou None/"" pour désassocier ids = [int(x) for x in server_ids if str(x).isdigit()] if not ids: return JSONResponse({"ok": False, "msg": "Aucun serveur"}) app_id_val = None app_itop_id = None app_name = None if application_id and str(application_id).strip().isdigit(): app_id_val = int(application_id) row = db.execute(text("SELECT itop_id, nom_court FROM applications WHERE id=:id"), {"id": app_id_val}).fetchone() if row: app_itop_id = row.itop_id app_name = row.nom_court else: return JSONResponse({"ok": False, "msg": "Application introuvable"}) placeholders = ",".join(str(i) for i in ids) db.execute(text(f"""UPDATE servers SET application_id=:aid, application_name=:an, updated_at=NOW() WHERE id IN ({placeholders})"""), {"aid": app_id_val, "an": app_name}) db.commit() updated = len(ids) # Push iTop itop_pushed = 0 itop_errors = 0 try: from ..services.itop_service import ITopClient from ..services.secrets_service import get_secret url = get_secret(db, "itop_url") u = get_secret(db, "itop_user") p = get_secret(db, "itop_pass") if url and u and p: client = ITopClient(url, u, p) new_list = [{"applicationsolution_id": int(app_itop_id)}] if app_itop_id else [] hosts = db.execute(text(f"SELECT hostname FROM servers WHERE id IN ({placeholders})")).fetchall() for h in hosts: try: rr = client._call("core/get", **{"class": "VirtualMachine", "key": f'SELECT VirtualMachine WHERE name = "{h.hostname}"', "output_fields": "name"}) if rr.get("objects"): vm_id = list(rr["objects"].values())[0]["key"] up = client.update("VirtualMachine", vm_id, {"applicationsolution_list": new_list}) if up.get("code") == 0: itop_pushed += 1 else: itop_errors += 1 except Exception: itop_errors += 1 except Exception: pass return JSONResponse({"ok": True, "updated": updated, "itop_pushed": itop_pushed, "itop_errors": itop_errors, "app_name": app_name or "(aucune)"}) # ═══════════════════════════════════════════════════════ # Correspondance prod ↔ hors-prod # ═══════════════════════════════════════════════════════ @router.get("/patching/correspondance", response_class=HTMLResponse) async def correspondance_page(request: Request, db=Depends(get_db), search: str = Query(""), application: str = Query(""), domain: str = Query(""), env: str = Query("")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not _can_edit_excludes(perms) and not can_view(perms, "campaigns"): return RedirectResponse(url="/dashboard") servers = corr.get_servers_for_builder(db, search=search, app=application, domain=domain, env=env) applications = db.execute(text("""SELECT DISTINCT application_name FROM servers WHERE application_name IS NOT NULL AND application_name != '' ORDER BY application_name""")).fetchall() envs = db.execute(text("SELECT DISTINCT name FROM environments ORDER BY name")).fetchall() domains = db.execute(text("SELECT DISTINCT name FROM domains ORDER BY name")).fetchall() all_apps = db.execute(text("""SELECT id, nom_court FROM applications WHERE itop_id IS NOT NULL ORDER BY nom_court""")).fetchall() # Stats globales stats = { "total_links": db.execute(text("SELECT COUNT(*) FROM server_correspondance")).scalar() or 0, "filtered": len(servers), } ctx = base_context(request, db, user) ctx.update({"app_name": APP_NAME, "servers": servers, "stats": stats, "applications": applications, "envs": [e.name for e in envs], "domains": [d.name for d in domains], "all_apps": all_apps, "search": search, "application": application, "domain": domain, "env": env, "can_edit": _can_edit_excludes(perms), "msg": request.query_params.get("msg", "")}) return templates.TemplateResponse("patching_correspondance.html", ctx) @router.post("/patching/correspondance/bulk-env") async def correspondance_bulk_env(request: Request, db=Depends(get_db)): """Change l'environnement réel de N serveurs (PatchCenter + push iTop).""" user = get_current_user(request) if not user: return JSONResponse({"ok": False}, status_code=401) perms = get_user_perms(db, user) if not _can_edit_excludes(perms): return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403) body = await request.json() server_ids = [int(x) for x in body.get("server_ids", []) if str(x).isdigit()] env_name = (body.get("env_name") or "").strip() if not server_ids or not env_name: return JSONResponse({"ok": False, "msg": "Paramètres manquants"}) # Trouver env_id env_row = db.execute(text("SELECT id FROM environments WHERE name=:n"), {"n": env_name}).fetchone() if not env_row: return JSONResponse({"ok": False, "msg": f"Env '{env_name}' introuvable"}) env_id = env_row.id placeholders = ",".join(str(i) for i in server_ids) # Pour chaque serveur : trouver/créer le domain_env correspondant et l'affecter updated = 0 srvs = db.execute(text(f"""SELECT s.id, s.hostname, s.domain_env_id, de.domain_id FROM servers s LEFT JOIN domain_environments de ON s.domain_env_id = de.id WHERE s.id IN ({placeholders})""")).fetchall() for s in srvs: if not s.domain_id: continue # pas de domaine actuel, skip # Trouver ou créer domain_environments(domain_id, env_id) de = db.execute(text("""SELECT id FROM domain_environments WHERE domain_id=:d AND environment_id=:e"""), {"d": s.domain_id, "e": env_id}).fetchone() if not de: db.execute(text("""INSERT INTO domain_environments (domain_id, environment_id) VALUES (:d, :e)"""), {"d": s.domain_id, "e": env_id}) db.commit() de = db.execute(text("""SELECT id FROM domain_environments WHERE domain_id=:d AND environment_id=:e"""), {"d": s.domain_id, "e": env_id}).fetchone() db.execute(text("UPDATE servers SET domain_env_id=:de_id, updated_at=NOW() WHERE id=:sid"), {"de_id": de.id, "sid": s.id}) updated += 1 db.commit() # Push iTop itop_pushed = 0 itop_errors = 0 try: from ..services.itop_service import ITopClient from ..services.secrets_service import get_secret url = get_secret(db, "itop_url") u = get_secret(db, "itop_user") p = get_secret(db, "itop_pass") if url and u and p: client = ITopClient(url, u, p) for s in srvs: try: rr = client._call("core/get", **{"class": "VirtualMachine", "key": f'SELECT VirtualMachine WHERE name = "{s.hostname}"', "output_fields": "name"}) if rr.get("objects"): vm_id = list(rr["objects"].values())[0]["key"] upd = client.update("VirtualMachine", vm_id, { "environnement_id": f"SELECT Environnement WHERE name = '{env_name}'" }) if upd.get("code") == 0: itop_pushed += 1 else: itop_errors += 1 except Exception: itop_errors += 1 except Exception: pass return JSONResponse({"ok": True, "updated": updated, "itop_pushed": itop_pushed, "itop_errors": itop_errors, "env_name": env_name}) @router.post("/patching/correspondance/bulk-application") async def correspondance_bulk_app(request: Request, db=Depends(get_db)): """Change la solution applicative de N serveurs (PatchCenter + push iTop).""" user = get_current_user(request) if not user: return JSONResponse({"ok": False}, status_code=401) perms = get_user_perms(db, user) if not _can_edit_excludes(perms): return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403) body = await request.json() server_ids = [int(x) for x in body.get("server_ids", []) if str(x).isdigit()] application_id = body.get("application_id") if not server_ids: return JSONResponse({"ok": False, "msg": "Aucun serveur"}) app_id_val = None app_itop_id = None app_name = None if application_id and str(application_id).strip().isdigit(): app_id_val = int(application_id) row = db.execute(text("SELECT itop_id, nom_court FROM applications WHERE id=:id"), {"id": app_id_val}).fetchone() if row: app_itop_id = row.itop_id app_name = row.nom_court else: return JSONResponse({"ok": False, "msg": "Application introuvable"}) placeholders = ",".join(str(i) for i in server_ids) db.execute(text(f"""UPDATE servers SET application_id=:aid, application_name=:an, updated_at=NOW() WHERE id IN ({placeholders})"""), {"aid": app_id_val, "an": app_name}) db.commit() itop_pushed = 0 itop_errors = 0 try: from ..services.itop_service import ITopClient from ..services.secrets_service import get_secret url = get_secret(db, "itop_url") u = get_secret(db, "itop_user") p = get_secret(db, "itop_pass") if url and u and p: client = ITopClient(url, u, p) new_list = [{"applicationsolution_id": int(app_itop_id)}] if app_itop_id else [] hosts = db.execute(text(f"SELECT hostname FROM servers WHERE id IN ({placeholders})")).fetchall() for h in hosts: try: rr = client._call("core/get", **{"class": "VirtualMachine", "key": f'SELECT VirtualMachine WHERE name = "{h.hostname}"', "output_fields": "name"}) if rr.get("objects"): vm_id = list(rr["objects"].values())[0]["key"] up = client.update("VirtualMachine", vm_id, {"applicationsolution_list": new_list}) if up.get("code") == 0: itop_pushed += 1 else: itop_errors += 1 except Exception: itop_errors += 1 except Exception: pass return JSONResponse({"ok": True, "updated": len(server_ids), "itop_pushed": itop_pushed, "itop_errors": itop_errors, "app_name": app_name or "(aucune)"}) @router.post("/patching/correspondance/bulk-create") async def correspondance_bulk_create(request: Request, db=Depends(get_db)): user = get_current_user(request) if not user: return JSONResponse({"ok": False}, status_code=401) perms = get_user_perms(db, user) if not _can_edit_excludes(perms): return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403) body = await request.json() prod_ids = [int(x) for x in body.get("prod_ids", []) if str(x).isdigit()] nonprod_ids = [int(x) for x in body.get("nonprod_ids", []) if str(x).isdigit()] env_labels = body.get("env_labels", {}) if not prod_ids or not nonprod_ids: return JSONResponse({"ok": False, "msg": "Au moins 1 prod et 1 non-prod requis"}) r = corr.bulk_create_correspondance(db, prod_ids, nonprod_ids, env_labels, user.get("uid")) return JSONResponse({"ok": True, **r}) @router.post("/patching/correspondance/auto-detect") async def correspondance_auto_detect(request: Request, db=Depends(get_db)): user = get_current_user(request) if not user: return JSONResponse({"ok": False}, status_code=401) perms = get_user_perms(db, user) if not _can_edit_excludes(perms): return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403) stats = corr.detect_correspondances(db) return JSONResponse({"ok": True, **{k: v for k, v in stats.items() if k != "plan"}}) @router.post("/patching/correspondance/link") async def correspondance_link(request: Request, db=Depends(get_db), prod_id: int = Form(...), nonprod_id: int = Form(...), env_code: str = Form(""), note: str = Form("")): user = get_current_user(request) if not user: return JSONResponse({"ok": False}, status_code=401) perms = get_user_perms(db, user) if not _can_edit_excludes(perms): return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403) corr.create_manual_link(db, prod_id, nonprod_id, env_code, note, user.get("uid")) return JSONResponse({"ok": True}) @router.post("/patching/correspondance/link-by-host") async def correspondance_link_by_host(request: Request, db=Depends(get_db), prod_id: int = Form(...), nonprod_hostname: str = Form(...), env_code: str = Form(""), note: str = Form("")): user = get_current_user(request) if not user: return JSONResponse({"ok": False}, status_code=401) perms = get_user_perms(db, user) if not _can_edit_excludes(perms): return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403) row = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname)=LOWER(:h)"), {"h": nonprod_hostname.strip()}).fetchone() if not row: return JSONResponse({"ok": False, "msg": f"Serveur '{nonprod_hostname}' introuvable"}) corr.create_manual_link(db, prod_id, row.id, env_code, note, user.get("uid")) return JSONResponse({"ok": True}) @router.post("/patching/correspondance/{corr_id}/delete") async def correspondance_delete(request: Request, corr_id: int, db=Depends(get_db)): user = get_current_user(request) if not user: return JSONResponse({"ok": False}, status_code=401) perms = get_user_perms(db, user) if not _can_edit_excludes(perms): return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403) corr.delete_link(db, corr_id) return JSONResponse({"ok": True}) # ═══════════════════════════════════════════════════════ # Validations post-patching # ═══════════════════════════════════════════════════════ @router.get("/patching/validations", response_class=HTMLResponse) async def validations_page(request: Request, db=Depends(get_db), status: str = Query("en_attente"), campaign_id: int = Query(None), env: str = Query("")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not _can_edit_excludes(perms): return RedirectResponse(url="/dashboard") validations = corr.get_pending_validations(db, env=env, campaign_id=campaign_id, status=status) # Contacts validateurs (responsables + référents) contacts = db.execute(text("""SELECT id, name, email, role, team FROM contacts WHERE is_active=true AND role IN ('responsable_applicatif','responsable_domaine','referent_technique','ra_prod','ra_test') ORDER BY name""")).fetchall() stats = { "en_attente": db.execute(text("SELECT COUNT(*) FROM patch_validation WHERE status='en_attente'")).scalar() or 0, "validated_ok": db.execute(text("SELECT COUNT(*) FROM patch_validation WHERE status='validated_ok'")).scalar() or 0, "validated_ko": db.execute(text("SELECT COUNT(*) FROM patch_validation WHERE status='validated_ko'")).scalar() or 0, "forced": db.execute(text("SELECT COUNT(*) FROM patch_validation WHERE status='forced'")).scalar() or 0, } envs = db.execute(text("SELECT DISTINCT name FROM environments ORDER BY name")).fetchall() ctx = base_context(request, db, user) ctx.update({"app_name": APP_NAME, "validations": validations, "contacts": contacts, "stats": stats, "status": status, "campaign_id": campaign_id, "env": env, "envs": [e.name for e in envs], "can_force": can_edit(perms, "campaigns") or user.get("role") == "admin", "msg": request.query_params.get("msg", "")}) return templates.TemplateResponse("patching_validations.html", ctx) @router.post("/patching/validations/mark") async def validations_mark(request: Request, db=Depends(get_db)): user = get_current_user(request) if not user: return JSONResponse({"ok": False}, status_code=401) perms = get_user_perms(db, user) if not _can_edit_excludes(perms): return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403) body = await request.json() ids = body.get("validation_ids", []) status = body.get("status", "validated_ok") contact_id = body.get("contact_id") forced_reason = body.get("forced_reason", "") notes = body.get("notes", "") if status not in ("validated_ok", "validated_ko", "forced"): return JSONResponse({"ok": False, "msg": "Status invalide"}) if status == "forced" and not forced_reason.strip(): return JSONResponse({"ok": False, "msg": "Raison obligatoire pour forcer"}) if status in ("validated_ok", "validated_ko") and not contact_id: return JSONResponse({"ok": False, "msg": "Validateur obligatoire"}) validator_name = None if contact_id: row = db.execute(text("SELECT name FROM contacts WHERE id=:id"), {"id": int(contact_id)}).fetchone() if row: validator_name = row.name n = corr.mark_validation(db, ids, status, contact_id, validator_name, forced_reason, notes, user.get("uid")) return JSONResponse({"ok": True, "updated": n}) @router.get("/patching/validations/history/{server_id}", response_class=HTMLResponse) async def validations_history(request: Request, server_id: int, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") srv = db.execute(text("SELECT id, hostname FROM servers WHERE id=:id"), {"id": server_id}).fetchone() if not srv: return HTMLResponse("Serveur introuvable", status_code=404) history = corr.get_validation_history(db, server_id) ctx = base_context(request, db, user) ctx.update({"app_name": APP_NAME, "server": srv, "history": history}) return templates.TemplateResponse("patching_validations_history.html", ctx)