patchcenter/app/routers/audit.py
Khalid MOUTAOUAKIL ba8a969366 Campagnes: workflow complet, audit serveurs, assignation operateurs
- Workflow: draft → pending_validation (COMEP) → planned → in_progress → completed
- Prereqs auto: SSH, disque (1.2Go /, 800Mo /var), satellite
- Assignation: operateurs prennent/liberent, coordinateur assigne/force
- Limites par operateur par campagne (max_servers + raison)
- Default intervenant permanent par serveur (auto-assigne)
- Planning jours: lun+mar hors-prod, mer+jeu prod, jamais vendredi
- Preferences serveur: pref_patch_jour, pref_patch_heure (permanents)
- Audit serveurs: import Excel, 29 colonnes, KPIs, detail HTMX
- Jours en francais (Lun, Mar, Mer, Jeu)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 13:06:08 +02:00

85 lines
3.4 KiB
Python

"""Router audit serveurs — resultats des scans d'audit"""
from fastapi import APIRouter, Request, Depends, Query
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
from ..dependencies import get_db, get_current_user
from ..config import APP_NAME
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
@router.get("/audit", response_class=HTMLResponse)
async def audit_page(request: Request, db=Depends(get_db),
filter: str = Query(None), search: str = Query(None)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
where = ["1=1"]
params = {}
if filter == "failed":
where.append("sa.status = 'CONNECTION_FAILED'")
elif filter == "disk":
where.append("sa.disk_alert = true")
elif filter == "no_qualys":
where.append("sa.qualys_active = false AND sa.status = 'OK'")
elif filter == "no_s1":
where.append("sa.sentinelone_active = false AND sa.status = 'OK'")
elif filter == "no_autostart":
where.append("sa.running_not_enabled IS NOT NULL AND sa.running_not_enabled != '' AND sa.status = 'OK'")
elif filter == "failed_svc":
where.append("sa.failed_services IS NOT NULL AND sa.failed_services != '' AND sa.status = 'OK'")
if search:
where.append("sa.hostname ILIKE :q")
params["q"] = f"%{search}%"
wc = " AND ".join(where)
entries = db.execute(text(f"""
SELECT sa.*, d.name as domaine, e.name as environnement
FROM server_audit sa
LEFT JOIN servers s ON sa.server_id = s.id
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
LEFT JOIN domains d ON de.domain_id = d.id
LEFT JOIN environments e ON de.environment_id = e.id
WHERE {wc}
ORDER BY sa.disk_alert DESC, sa.status, sa.hostname
LIMIT 500
"""), params).fetchall()
# Stats
stats = db.execute(text("""
SELECT
COUNT(*) as total,
COUNT(*) FILTER (WHERE status = 'OK') as ok,
COUNT(*) FILTER (WHERE status = 'CONNECTION_FAILED') as failed,
COUNT(*) FILTER (WHERE disk_alert = true) as disk_alerts,
COUNT(*) FILTER (WHERE qualys_active = true) as qualys_ok,
COUNT(*) FILTER (WHERE sentinelone_active = true) as s1_ok,
COUNT(*) FILTER (WHERE running_not_enabled IS NOT NULL AND running_not_enabled != '') as no_autostart,
COUNT(*) FILTER (WHERE failed_services IS NOT NULL AND failed_services != '') as failed_svc
FROM server_audit
""")).fetchone()
return templates.TemplateResponse("audit.html", {
"request": request, "user": user, "app_name": APP_NAME,
"entries": entries, "stats": stats, "filter": filter,
"search": search,
})
@router.get("/audit/{audit_id}", response_class=HTMLResponse)
async def audit_detail(request: Request, audit_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return HTMLResponse("<p>Non autorise</p>")
entry = db.execute(text("SELECT * FROM server_audit WHERE id = :id"),
{"id": audit_id}).fetchone()
if not entry:
return HTMLResponse("<p>Non trouve</p>")
return templates.TemplateResponse("partials/audit_detail.html", {
"request": request, "e": entry,
})