- server_audit_full_service.py: SSH PSMP/cle, parsing, stockage JSONB, flow map - server_audit.sh: script bash avec sudo (compatible PSMP cybsecope) - audit_full router: import JSON, liste, detail, carte flux - Templates: liste audits, detail 8 onglets, carte flux + carte applicative - Jointures: server_id via servers, dest_server via server_ips - Sous-menu Audit > Complet dans la sidebar Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
105 lines
4.6 KiB
Python
105 lines
4.6 KiB
Python
"""Router Audit Complet — import JSON, liste, detail, carte flux, carte applicative"""
|
|
import json
|
|
from fastapi import APIRouter, Request, Depends, UploadFile, File
|
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from sqlalchemy import text
|
|
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, base_context
|
|
from ..services.server_audit_full_service import (
|
|
import_json_report, get_latest_audits, get_audit_detail,
|
|
get_flow_map, get_flow_map_for_server, get_app_map,
|
|
)
|
|
from ..config import APP_NAME
|
|
|
|
router = APIRouter()
|
|
templates = Jinja2Templates(directory="app/templates")
|
|
|
|
|
|
@router.get("/audit-full", response_class=HTMLResponse)
|
|
async def audit_full_list(request: Request, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_view(perms, "audit"):
|
|
return RedirectResponse(url="/dashboard")
|
|
|
|
audits = get_latest_audits(db)
|
|
ctx = base_context(request, db, user)
|
|
ctx.update({
|
|
"app_name": APP_NAME, "audits": audits,
|
|
"msg": request.query_params.get("msg"),
|
|
})
|
|
return templates.TemplateResponse("audit_full_list.html", ctx)
|
|
|
|
|
|
@router.post("/audit-full/import")
|
|
async def audit_full_import(request: Request, db=Depends(get_db),
|
|
file: UploadFile = File(...)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
|
|
try:
|
|
content = await file.read()
|
|
json_data = json.loads(content.decode("utf-8-sig"))
|
|
imported, errors = import_json_report(db, json_data)
|
|
return RedirectResponse(
|
|
url=f"/audit-full?msg=imported_{imported}_{errors}",
|
|
status_code=303,
|
|
)
|
|
except Exception as e:
|
|
return RedirectResponse(
|
|
url=f"/audit-full?msg=error_{str(e)[:50]}",
|
|
status_code=303,
|
|
)
|
|
|
|
|
|
@router.get("/audit-full/{audit_id}", response_class=HTMLResponse)
|
|
async def audit_full_detail(request: Request, audit_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
|
|
audit = get_audit_detail(db, audit_id)
|
|
if not audit:
|
|
return RedirectResponse(url="/audit-full")
|
|
|
|
# Flux pour ce serveur
|
|
flows = get_flow_map_for_server(db, audit.hostname)
|
|
|
|
ctx = base_context(request, db, user)
|
|
ctx.update({
|
|
"app_name": APP_NAME, "a": audit, "flows": flows,
|
|
"services": audit.services if isinstance(audit.services, list) else json.loads(audit.services or "[]"),
|
|
"processes": audit.processes if isinstance(audit.processes, list) else json.loads(audit.processes or "[]"),
|
|
"listen_ports": audit.listen_ports if isinstance(audit.listen_ports, list) else json.loads(audit.listen_ports or "[]"),
|
|
"connections": audit.connections if isinstance(audit.connections, list) else json.loads(audit.connections or "[]"),
|
|
"flux_in": audit.flux_in if isinstance(audit.flux_in, list) else json.loads(audit.flux_in or "[]"),
|
|
"flux_out": audit.flux_out if isinstance(audit.flux_out, list) else json.loads(audit.flux_out or "[]"),
|
|
"disk_usage": audit.disk_usage if isinstance(audit.disk_usage, list) else json.loads(audit.disk_usage or "[]"),
|
|
"interfaces": audit.interfaces if isinstance(audit.interfaces, list) else json.loads(audit.interfaces or "[]"),
|
|
"correlation": audit.correlation_matrix if isinstance(audit.correlation_matrix, list) else json.loads(audit.correlation_matrix or "[]"),
|
|
"outbound": audit.outbound_only if isinstance(audit.outbound_only, list) else json.loads(audit.outbound_only or "[]"),
|
|
"firewall": audit.firewall if isinstance(audit.firewall, dict) else json.loads(audit.firewall or "{}"),
|
|
"conn_wait": audit.conn_wait if isinstance(audit.conn_wait, list) else json.loads(audit.conn_wait or "[]"),
|
|
"traffic": audit.traffic if isinstance(audit.traffic, list) else json.loads(audit.traffic or "[]"),
|
|
})
|
|
return templates.TemplateResponse("audit_full_detail.html", ctx)
|
|
|
|
|
|
@router.get("/audit-full/flow-map", response_class=HTMLResponse)
|
|
async def audit_full_flow_map(request: Request, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
|
|
flows = get_flow_map(db)
|
|
app_map = get_app_map(db)
|
|
|
|
ctx = base_context(request, db, user)
|
|
ctx.update({
|
|
"app_name": APP_NAME, "flows": flows, "app_map": app_map,
|
|
})
|
|
return templates.TemplateResponse("audit_full_flowmap.html", ctx)
|