patchcenter/app/routers/audit.py
Khalid MOUTAOUAKIL 53c393b49b Permissions DB, créneaux auto, assignations, audit Splunk, accents
- Permissions 100% depuis user_permissions (plus de hardcode)
- Middleware injecte perms dans chaque requête
- Créneaux auto: 09h-12h30 / 14h-16h45, pas 15min, hprod lun-mar, prod mer-jeu
- Assignations par défaut: par domaine, app_type, zone, serveur (table default_assignments)
- Auto-liaison app_group: même intervenant recette+prod
- Audit Splunk: /var/log/patchcenter_audit.json (JSON one-line par event)
- Login/logout/campagnes/prereqs loggés en base + fichier
- Page erreur maintenance (500/404) avec contact SecOps
- Accents français dans toute lUI
- Operator affiché comme Intervenant
- Session 1h, redirect / vers dashboard si connecté
- Demo mode prereqs (DEMO_MODE=True)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 15:25:43 +02:00

85 lines
3.4 KiB
Python

"""Router audit serveurs — resultats des scans d'audit"""
from fastapi import APIRouter, Request, Depends, Query
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, can_admin, base_context
from ..config import APP_NAME
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
@router.get("/audit", response_class=HTMLResponse)
async def audit_page(request: Request, db=Depends(get_db),
filter: str = Query(None), search: str = Query(None)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
where = ["1=1"]
params = {}
if filter == "failed":
where.append("sa.status = 'CONNECTION_FAILED'")
elif filter == "disk":
where.append("sa.disk_alert = true")
elif filter == "no_qualys":
where.append("sa.qualys_active = false AND sa.status = 'OK'")
elif filter == "no_s1":
where.append("sa.sentinelone_active = false AND sa.status = 'OK'")
elif filter == "no_autostart":
where.append("sa.running_not_enabled IS NOT NULL AND sa.running_not_enabled != '' AND sa.status = 'OK'")
elif filter == "failed_svc":
where.append("sa.failed_services IS NOT NULL AND sa.failed_services != '' AND sa.status = 'OK'")
if search:
where.append("sa.hostname ILIKE :q")
params["q"] = f"%{search}%"
wc = " AND ".join(where)
entries = db.execute(text(f"""
SELECT sa.*, d.name as domaine, e.name as environnement
FROM server_audit sa
LEFT JOIN servers s ON sa.server_id = s.id
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
LEFT JOIN domains d ON de.domain_id = d.id
LEFT JOIN environments e ON de.environment_id = e.id
WHERE {wc}
ORDER BY sa.disk_alert DESC, sa.status, sa.hostname
LIMIT 500
"""), params).fetchall()
# Stats
stats = db.execute(text("""
SELECT
COUNT(*) as total,
COUNT(*) FILTER (WHERE status = 'OK') as ok,
COUNT(*) FILTER (WHERE status = 'CONNECTION_FAILED') as failed,
COUNT(*) FILTER (WHERE disk_alert = true) as disk_alerts,
COUNT(*) FILTER (WHERE qualys_active = true) as qualys_ok,
COUNT(*) FILTER (WHERE sentinelone_active = true) as s1_ok,
COUNT(*) FILTER (WHERE running_not_enabled IS NOT NULL AND running_not_enabled != '') as no_autostart,
COUNT(*) FILTER (WHERE failed_services IS NOT NULL AND failed_services != '') as failed_svc
FROM server_audit
""")).fetchone()
return templates.TemplateResponse("audit.html", {
"request": request, "user": user, "app_name": APP_NAME,
"entries": entries, "stats": stats, "filter": filter,
"search": search,
})
@router.get("/audit/{audit_id}", response_class=HTMLResponse)
async def audit_detail(request: Request, audit_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return HTMLResponse("<p>Non autorise</p>")
entry = db.execute(text("SELECT * FROM server_audit WHERE id = :id"),
{"id": audit_id}).fetchone()
if not entry:
return HTMLResponse("<p>Non trouve</p>")
return templates.TemplateResponse("partials/audit_detail.html", {
"request": request, "e": entry,
})