patchcenter/app/routers/audit_full.py
Khalid MOUTAOUAKIL 20cd9c7d80 Audit complet: import JSON, carte flux, carte applicative
- server_audit_full_service.py: SSH PSMP/cle, parsing, stockage JSONB, flow map
- server_audit.sh: script bash avec sudo (compatible PSMP cybsecope)
- audit_full router: import JSON, liste, detail, carte flux
- Templates: liste audits, detail 8 onglets, carte flux + carte applicative
- Jointures: server_id via servers, dest_server via server_ips
- Sous-menu Audit > Complet dans la sidebar

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 16:06:55 +02:00

105 lines
4.6 KiB
Python

"""Router Audit Complet — import JSON, liste, detail, carte flux, carte applicative"""
import json
from fastapi import APIRouter, Request, Depends, UploadFile, File
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, base_context
from ..services.server_audit_full_service import (
import_json_report, get_latest_audits, get_audit_detail,
get_flow_map, get_flow_map_for_server, get_app_map,
)
from ..config import APP_NAME
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
@router.get("/audit-full", response_class=HTMLResponse)
async def audit_full_list(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "audit"):
return RedirectResponse(url="/dashboard")
audits = get_latest_audits(db)
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME, "audits": audits,
"msg": request.query_params.get("msg"),
})
return templates.TemplateResponse("audit_full_list.html", ctx)
@router.post("/audit-full/import")
async def audit_full_import(request: Request, db=Depends(get_db),
file: UploadFile = File(...)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
try:
content = await file.read()
json_data = json.loads(content.decode("utf-8-sig"))
imported, errors = import_json_report(db, json_data)
return RedirectResponse(
url=f"/audit-full?msg=imported_{imported}_{errors}",
status_code=303,
)
except Exception as e:
return RedirectResponse(
url=f"/audit-full?msg=error_{str(e)[:50]}",
status_code=303,
)
@router.get("/audit-full/{audit_id}", response_class=HTMLResponse)
async def audit_full_detail(request: Request, audit_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
audit = get_audit_detail(db, audit_id)
if not audit:
return RedirectResponse(url="/audit-full")
# Flux pour ce serveur
flows = get_flow_map_for_server(db, audit.hostname)
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME, "a": audit, "flows": flows,
"services": audit.services if isinstance(audit.services, list) else json.loads(audit.services or "[]"),
"processes": audit.processes if isinstance(audit.processes, list) else json.loads(audit.processes or "[]"),
"listen_ports": audit.listen_ports if isinstance(audit.listen_ports, list) else json.loads(audit.listen_ports or "[]"),
"connections": audit.connections if isinstance(audit.connections, list) else json.loads(audit.connections or "[]"),
"flux_in": audit.flux_in if isinstance(audit.flux_in, list) else json.loads(audit.flux_in or "[]"),
"flux_out": audit.flux_out if isinstance(audit.flux_out, list) else json.loads(audit.flux_out or "[]"),
"disk_usage": audit.disk_usage if isinstance(audit.disk_usage, list) else json.loads(audit.disk_usage or "[]"),
"interfaces": audit.interfaces if isinstance(audit.interfaces, list) else json.loads(audit.interfaces or "[]"),
"correlation": audit.correlation_matrix if isinstance(audit.correlation_matrix, list) else json.loads(audit.correlation_matrix or "[]"),
"outbound": audit.outbound_only if isinstance(audit.outbound_only, list) else json.loads(audit.outbound_only or "[]"),
"firewall": audit.firewall if isinstance(audit.firewall, dict) else json.loads(audit.firewall or "{}"),
"conn_wait": audit.conn_wait if isinstance(audit.conn_wait, list) else json.loads(audit.conn_wait or "[]"),
"traffic": audit.traffic if isinstance(audit.traffic, list) else json.loads(audit.traffic or "[]"),
})
return templates.TemplateResponse("audit_full_detail.html", ctx)
@router.get("/audit-full/flow-map", response_class=HTMLResponse)
async def audit_full_flow_map(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
flows = get_flow_map(db)
app_map = get_app_map(db)
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME, "flows": flows, "app_map": app_map,
})
return templates.TemplateResponse("audit_full_flowmap.html", ctx)