"""Router Audit Complet — import JSON, liste, detail, carte flux, carte applicative""" import json from fastapi import APIRouter, Request, Depends, UploadFile, File from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from sqlalchemy import text from ..dependencies import get_db, get_current_user, get_user_perms, can_view, base_context from ..services.server_audit_full_service import ( import_json_report, get_latest_audits, get_audit_detail, get_flow_map, get_flow_map_for_server, get_app_map, ) from ..config import APP_NAME router = APIRouter() templates = Jinja2Templates(directory="app/templates") @router.get("/audit-full", response_class=HTMLResponse) async def audit_full_list(request: Request, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_view(perms, "audit"): return RedirectResponse(url="/dashboard") filtre = request.query_params.get("filter", "") search = request.query_params.get("q", "").strip() domain = request.query_params.get("domain", "") page = int(request.query_params.get("page", "1")) per_page = 20 # KPIs (toujours sur tout le jeu) kpis = db.execute(text(""" SELECT COUNT(*) as total, COUNT(*) FILTER (WHERE reboot_required = true) as needs_reboot, COUNT(*) FILTER (WHERE EXISTS ( SELECT 1 FROM jsonb_array_elements(disk_usage) d WHERE (d->>'pct')::int >= 90 )) as disk_critical, COUNT(*) FILTER (WHERE EXISTS ( SELECT 1 FROM jsonb_array_elements(disk_usage) d WHERE (d->>'pct')::int >= 80 AND (d->>'pct')::int < 90 )) as disk_warning, COUNT(*) FILTER (WHERE uptime LIKE '%month%' OR uptime LIKE '%year%' OR (uptime LIKE '%week%' AND ( CASE WHEN uptime ~ '(\d+) week' THEN (substring(uptime from '(\d+) week'))::int ELSE 0 END >= 17 )) ) as uptime_long FROM server_audit_full WHERE status = 'ok' AND id IN (SELECT DISTINCT ON (hostname) id FROM server_audit_full WHERE status = 'ok' ORDER BY hostname, audit_date DESC) """)).fetchone() # Domaines pour le filtre all_domains = db.execute(text( "SELECT code, name FROM domains ORDER BY name" )).fetchall() # Requete avec filtres audits = get_latest_audits(db, limit=9999) # Filtre KPI if filtre == "reboot": audits = [a for a in audits if a.reboot_required] elif filtre == "disk_critical": ids = {r.id for r in db.execute(text(""" SELECT saf.id FROM server_audit_full saf WHERE saf.status = 'ok' AND EXISTS ( SELECT 1 FROM jsonb_array_elements(saf.disk_usage) d WHERE (d->>'pct')::int >= 90 ) AND saf.id IN (SELECT DISTINCT ON (hostname) id FROM server_audit_full WHERE status = 'ok' ORDER BY hostname, audit_date DESC) """)).fetchall()} audits = [a for a in audits if a.id in ids] elif filtre == "disk_warning": ids = {r.id for r in db.execute(text(""" SELECT saf.id FROM server_audit_full saf WHERE saf.status = 'ok' AND EXISTS ( SELECT 1 FROM jsonb_array_elements(saf.disk_usage) d WHERE (d->>'pct')::int >= 80 ) AND saf.id IN (SELECT DISTINCT ON (hostname) id FROM server_audit_full WHERE status = 'ok' ORDER BY hostname, audit_date DESC) """)).fetchall()} audits = [a for a in audits if a.id in ids] elif filtre == "uptime": audits = [a for a in audits if a.uptime and ("month" in a.uptime or "year" in a.uptime)] # Filtre domaine if domain: domain_servers = {r.hostname for r in db.execute(text(""" SELECT s.hostname FROM servers s JOIN domain_environments de ON s.domain_env_id = de.id JOIN domains d ON de.domain_id = d.id WHERE d.code = :dc """), {"dc": domain}).fetchall()} audits = [a for a in audits if a.hostname in domain_servers] # Recherche hostname if search: q = search.lower() audits = [a for a in audits if q in a.hostname.lower()] # Pagination total_filtered = len(audits) total_pages = max(1, (total_filtered + per_page - 1) // per_page) page = max(1, min(page, total_pages)) audits_page = audits[(page - 1) * per_page : page * per_page] ctx = base_context(request, db, user) ctx.update({ "app_name": APP_NAME, "audits": audits_page, "kpis": kpis, "filter": filtre, "search": search, "domain": domain, "all_domains": all_domains, "page": page, "total_pages": total_pages, "total_filtered": total_filtered, "msg": request.query_params.get("msg"), }) return templates.TemplateResponse("audit_full_list.html", ctx) @router.post("/audit-full/import") async def audit_full_import(request: Request, db=Depends(get_db), file: UploadFile = File(...)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") try: content = await file.read() json_data = json.loads(content.decode("utf-8-sig")) imported, errors = import_json_report(db, json_data) return RedirectResponse( url=f"/audit-full?msg=imported_{imported}_{errors}", status_code=303, ) except Exception as e: return RedirectResponse( url=f"/audit-full?msg=error_{str(e)[:50]}", status_code=303, ) @router.get("/audit-full/flow-map", response_class=HTMLResponse) async def audit_full_flow_map(request: Request, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") flows = get_flow_map(db) app_map = get_app_map(db) ctx = base_context(request, db, user) ctx.update({ "app_name": APP_NAME, "flows": flows, "app_map": app_map, }) return templates.TemplateResponse("audit_full_flowmap.html", ctx) @router.get("/audit-full/{audit_id}", response_class=HTMLResponse) async def audit_full_detail(request: Request, audit_id: int, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") audit = get_audit_detail(db, audit_id) if not audit: return RedirectResponse(url="/audit-full") # Flux pour ce serveur flows = get_flow_map_for_server(db, audit.hostname) ctx = base_context(request, db, user) ctx.update({ "app_name": APP_NAME, "a": audit, "flows": flows, "services": audit.services if isinstance(audit.services, list) else json.loads(audit.services or "[]"), "processes": audit.processes if isinstance(audit.processes, list) else json.loads(audit.processes or "[]"), "listen_ports": audit.listen_ports if isinstance(audit.listen_ports, list) else json.loads(audit.listen_ports or "[]"), "connections": audit.connections if isinstance(audit.connections, list) else json.loads(audit.connections or "[]"), "flux_in": audit.flux_in if isinstance(audit.flux_in, list) else json.loads(audit.flux_in or "[]"), "flux_out": audit.flux_out if isinstance(audit.flux_out, list) else json.loads(audit.flux_out or "[]"), "disk_usage": audit.disk_usage if isinstance(audit.disk_usage, list) else json.loads(audit.disk_usage or "[]"), "interfaces": audit.interfaces if isinstance(audit.interfaces, list) else json.loads(audit.interfaces or "[]"), "correlation": audit.correlation_matrix if isinstance(audit.correlation_matrix, list) else json.loads(audit.correlation_matrix or "[]"), "outbound": audit.outbound_only if isinstance(audit.outbound_only, list) else json.loads(audit.outbound_only or "[]"), "firewall": audit.firewall if isinstance(audit.firewall, dict) else json.loads(audit.firewall or "{}"), "conn_wait": audit.conn_wait if isinstance(audit.conn_wait, list) else json.loads(audit.conn_wait or "[]"), "traffic": audit.traffic if isinstance(audit.traffic, list) else json.loads(audit.traffic or "[]"), }) return templates.TemplateResponse("audit_full_detail.html", ctx)