patchcenter/app/routers/audit.py

307 lines
12 KiB
Python

"""Router audit serveurs — résultats des scans + audit temps réel"""
import csv, io
from fastapi import APIRouter, Request, Depends, Query, Form, UploadFile, File
from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, can_admin, base_context
from ..services.realtime_audit_service import audit_servers_list, save_audit_to_db
from ..config import APP_NAME
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
@router.get("/audit", response_class=HTMLResponse)
async def audit_page(request: Request, db=Depends(get_db),
filter: str = Query(None), search: str = Query(None)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "audit"):
return RedirectResponse(url="/dashboard")
where = ["1=1"]
params = {}
if filter == "failed":
where.append("sa.status = 'CONNECTION_FAILED'")
elif filter == "disk":
where.append("sa.disk_alert = true")
elif filter == "no_qualys":
where.append("sa.qualys_active = false AND sa.status = 'OK'")
elif filter == "no_s1":
where.append("sa.sentinelone_active = false AND sa.status = 'OK'")
elif filter == "no_autostart":
where.append("sa.running_not_enabled IS NOT NULL AND sa.running_not_enabled != '' AND sa.status = 'OK'")
elif filter == "failed_svc":
where.append("sa.failed_services IS NOT NULL AND sa.failed_services != '' AND sa.status = 'OK'")
if search:
where.append("sa.hostname ILIKE :q")
params["q"] = f"%{search}%"
wc = " AND ".join(where)
entries = db.execute(text(f"""
SELECT sa.*, d.name as domaine, e.name as environnement
FROM server_audit sa
LEFT JOIN servers s ON sa.server_id = s.id
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
LEFT JOIN domains d ON de.domain_id = d.id
LEFT JOIN environments e ON de.environment_id = e.id
WHERE {wc}
ORDER BY sa.disk_alert DESC, sa.status, sa.hostname
LIMIT 500
"""), params).fetchall()
stats = db.execute(text("""
SELECT
COUNT(*) as total,
COUNT(*) FILTER (WHERE status = 'OK') as ok,
COUNT(*) FILTER (WHERE status = 'CONNECTION_FAILED') as failed,
COUNT(*) FILTER (WHERE disk_alert = true) as disk_alerts,
COUNT(*) FILTER (WHERE qualys_active = true) as qualys_ok,
COUNT(*) FILTER (WHERE sentinelone_active = true) as s1_ok,
COUNT(*) FILTER (WHERE running_not_enabled IS NOT NULL AND running_not_enabled != '') as no_autostart,
COUNT(*) FILTER (WHERE failed_services IS NOT NULL AND failed_services != '') as failed_svc
FROM server_audit
""")).fetchone()
# Dernier audit global
last_audit = db.execute(text(
"SELECT MAX(audit_date) as last_date, COUNT(*) as count FROM server_audit"
)).fetchone()
# Domaines et zones pour exclusions
domains = db.execute(text("SELECT code, name FROM domains ORDER BY display_order")).fetchall()
zones = db.execute(text("SELECT DISTINCT name FROM zones ORDER BY name")).fetchall()
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME, "entries": entries, "stats": stats,
"filter": filter, "search": search,
"last_audit": last_audit, "domains": domains, "zones": zones,
})
return templates.TemplateResponse("audit.html", ctx)
@router.get("/audit/realtime", response_class=HTMLResponse)
async def audit_realtime_redirect(request: Request):
return RedirectResponse(url="/dashboard", status_code=302)
@router.get("/audit/specific", response_class=HTMLResponse)
async def audit_specific_page(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "audit"):
return RedirectResponse(url="/audit")
ctx = base_context(request, db, user)
ctx["app_name"] = APP_NAME
return templates.TemplateResponse("audit_specific.html", ctx)
@router.get("/audit/{audit_id}", response_class=HTMLResponse)
async def audit_detail(request: Request, audit_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return HTMLResponse("<p>Non autorisé</p>")
entry = db.execute(text("SELECT * FROM server_audit WHERE id = :id"),
{"id": audit_id}).fetchone()
if not entry:
return HTMLResponse("<p>Non trouvé</p>")
return templates.TemplateResponse("partials/audit_detail.html", {
"request": request, "e": entry,
})
# --- Audit temps réel ---
@router.post("/audit/global", response_class=HTMLResponse)
async def audit_global(request: Request, db=Depends(get_db)):
"""Lance un audit global sur tous les serveurs Linux hors exclusions"""
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "audit"):
return RedirectResponse(url="/audit")
form = await request.form()
exclude_domains = form.getlist("exclude_domains")
exclude_zones = form.getlist("exclude_zones")
parallel = int(form.get("parallel", "5"))
# Construire la requete
where = ["s.os_family = 'linux'", "s.etat = 'en_production'"]
params = {}
if exclude_domains:
where.append("d.code NOT IN :ed")
params["ed"] = tuple(exclude_domains)
if exclude_zones:
where.append("z.name NOT IN :ez")
params["ez"] = tuple(exclude_zones)
wc = " AND ".join(where)
# Utiliser du SQL direct avec parametres corrects
query = f"""
SELECT s.hostname FROM servers s
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
LEFT JOIN domains d ON de.domain_id = d.id
LEFT JOIN zones z ON s.zone_id = z.id
WHERE {wc} ORDER BY s.hostname
"""
# Gerer les tuples pour IN
if exclude_domains:
placeholders = ",".join([f":ed{i}" for i in range(len(exclude_domains))])
query = query.replace("d.code NOT IN :ed", f"d.code NOT IN ({placeholders})")
for i, v in enumerate(exclude_domains):
params[f"ed{i}"] = v
del params["ed"]
if exclude_zones:
placeholders = ",".join([f":ez{i}" for i in range(len(exclude_zones))])
query = query.replace("z.name NOT IN :ez", f"z.name NOT IN ({placeholders})")
for i, v in enumerate(exclude_zones):
params[f"ez{i}"] = v
del params["ez"]
rows = db.execute(text(query), params).fetchall()
hostnames = [r.hostname for r in rows]
if not hostnames:
return RedirectResponse(url="/audit?msg=no_hosts", status_code=303)
# Lancer l'audit
results = audit_servers_list(hostnames)
request.app.state.last_audit_results = results
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME, "results": results,
"total": len(results),
"ok": sum(1 for r in results if r.get("status") == "OK"),
"failed": sum(1 for r in results if r.get("status") != "OK"),
})
return templates.TemplateResponse("audit_realtime_results.html", ctx)
@router.post("/audit/realtime", response_class=HTMLResponse)
async def audit_realtime(request: Request, db=Depends(get_db),
hostnames_text: str = Form(""),
hostnames_file: UploadFile = File(None)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "audit"):
return RedirectResponse(url="/audit")
# Collecter les hostnames
hostnames = []
if hostnames_text.strip():
hostnames = [h.strip() for h in hostnames_text.strip().replace(",", "\n").split("\n") if h.strip()]
if hostnames_file and hostnames_file.filename:
content = (await hostnames_file.read()).decode("utf-8", errors="replace")
hostnames += [h.strip() for h in content.split("\n") if h.strip() and not h.startswith("#")]
if not hostnames:
return RedirectResponse(url="/audit?msg=no_hosts", status_code=303)
# Lancer l'audit
results = audit_servers_list(hostnames)
# Stocker en session (request.state) pour export/save
request.app.state.last_audit_results = results
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME, "results": results,
"total": len(results),
"ok": sum(1 for r in results if r.get("status") == "OK"),
"failed": sum(1 for r in results if r.get("status") != "OK"),
})
response = templates.TemplateResponse("audit_realtime_results.html", ctx)
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
response.headers["Pragma"] = "no-cache"
return response
@router.post("/audit/realtime/save")
async def audit_realtime_save(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "audit"):
return RedirectResponse(url="/audit")
results = getattr(request.app.state, "last_audit_results", None)
if not results:
return RedirectResponse(url="/audit?msg=no_results", status_code=303)
updated, inserted = save_audit_to_db(db, results)
return RedirectResponse(url=f"/audit?msg=saved_{updated}_{inserted}", status_code=303)
@router.get("/audit/export/csv")
async def audit_export_csv(request: Request, db=Depends(get_db),
filter: str = Query(None), search: str = Query(None)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "audit"):
return RedirectResponse(url="/audit")
where = ["1=1"]
params = {}
if filter == "failed":
where.append("sa.status = 'CONNECTION_FAILED'")
elif filter == "no_qualys":
where.append("sa.qualys_active = false AND sa.status = 'OK'")
elif filter == "no_s1":
where.append("sa.sentinelone_active = false AND sa.status = 'OK'")
if search:
where.append("sa.hostname ILIKE :q"); params["q"] = f"%{search}%"
wc = " AND ".join(where)
rows = db.execute(text(f"""
SELECT sa.hostname, sa.status, sa.connection_method, sa.resolved_fqdn,
sa.os_release, sa.kernel, sa.uptime, sa.selinux,
sa.disk_detail, sa.disk_alert, sa.apps_installed,
sa.services_running, sa.running_not_enabled, sa.listening_ports,
sa.db_detected, sa.cluster_detected, sa.containers,
sa.agents, sa.qualys_active, sa.sentinelone_active,
sa.failed_services, sa.audit_date
FROM server_audit sa WHERE {wc} ORDER BY sa.hostname
"""), params).fetchall()
output = io.StringIO()
writer = csv.writer(output, delimiter=";")
headers = ["Hostname", "Statut", "Connexion", "FQDN", "OS", "Kernel", "Uptime", "SELinux",
"Disque", "Alerte disque", "Applications", "Services running",
"Sans auto-start", "Ports", "BDD", "Cluster", "Containers",
"Agents", "Qualys", "SentinelOne", "Services KO", "Date audit"]
writer.writerow(headers)
for r in rows:
writer.writerow([
r.hostname, r.status, r.connection_method, r.resolved_fqdn,
(r.os_release or "")[:80], r.kernel, r.uptime, r.selinux,
(r.disk_detail or "")[:100], "OUI" if r.disk_alert else "NON",
(r.apps_installed or "")[:100], (r.services_running or "")[:100],
(r.running_not_enabled or "")[:100], (r.listening_ports or "")[:100],
r.db_detected, r.cluster_detected, (r.containers or "")[:80],
r.agents, "OUI" if r.qualys_active else "NON",
"OUI" if r.sentinelone_active else "NON", r.failed_services,
r.audit_date.strftime("%Y-%m-%d %H:%M") if r.audit_date else "",
])
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=audit_export_{filter or 'all'}.csv"}
)