patchcenter/app/services/audit_service.py
Khalid MOUTAOUAKIL 53c393b49b Permissions DB, créneaux auto, assignations, audit Splunk, accents
- Permissions 100% depuis user_permissions (plus de hardcode)
- Middleware injecte perms dans chaque requête
- Créneaux auto: 09h-12h30 / 14h-16h45, pas 15min, hprod lun-mar, prod mer-jeu
- Assignations par défaut: par domaine, app_type, zone, serveur (table default_assignments)
- Auto-liaison app_group: même intervenant recette+prod
- Audit Splunk: /var/log/patchcenter_audit.json (JSON one-line par event)
- Login/logout/campagnes/prereqs loggés en base + fichier
- Page erreur maintenance (500/404) avec contact SecOps
- Accents français dans toute lUI
- Operator affiché comme Intervenant
- Session 1h, redirect / vers dashboard si connecté
- Demo mode prereqs (DEMO_MODE=True)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 15:25:43 +02:00

140 lines
5.5 KiB
Python

"""Service audit — log centralise de toutes les actions pour Splunk"""
import json
import logging
from datetime import datetime
from fastapi import Request
from sqlalchemy import text
logger = logging.getLogger("patchcenter.audit")
# Format JSON structure pour Splunk (une ligne par event)
LOG_FORMAT = '%(message)s'
handler = logging.FileHandler("/var/log/patchcenter_audit.json")
handler.setFormatter(logging.Formatter(LOG_FORMAT))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def log_action(db, request: Request, user: dict, action: str,
entity_type: str = None, entity_id: int = None,
details: dict = None):
"""Log une action dans la base ET dans le fichier JSON pour Splunk"""
username = user.get("sub", "system") if user else "system"
uid = user.get("uid") if user else None
ip = _get_client_ip(request) if request else None
# Insert en base
db.execute(text("""
INSERT INTO audit_log (user_id, username, action, entity_type, entity_id, details, ip_address)
VALUES (:uid, :un, :action, :et, :eid, :details, :ip)
"""), {
"uid": uid, "un": username, "action": action,
"et": entity_type, "eid": entity_id,
"details": json.dumps(details) if details else None,
"ip": ip,
})
# Log fichier JSON (Splunk-ready)
event = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"app": "patchcenter",
"action": action,
"username": username,
"user_id": uid,
"entity_type": entity_type,
"entity_id": entity_id,
"ip": ip,
"details": details,
}
logger.info(json.dumps(event, ensure_ascii=False))
def _get_client_ip(request: Request):
"""Extrait l'IP client (supporte X-Forwarded-For derriere nginx)"""
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0].strip()
if request.client:
return request.client.host
return None
# === Actions predefinies ===
def log_login(db, request, user):
log_action(db, request, user, "LOGIN", "user", user.get("uid"))
def log_logout(db, request, user):
log_action(db, request, user, "LOGOUT", "user", user.get("uid"))
def log_login_failed(db, request, username):
log_action(db, request, None, "LOGIN_FAILED", "user", None,
{"username": username})
def log_campaign_create(db, request, user, campaign_id, label):
log_action(db, request, user, "CAMPAIGN_CREATE", "campaign", campaign_id,
{"label": label})
def log_campaign_status(db, request, user, campaign_id, old_status, new_status):
log_action(db, request, user, "CAMPAIGN_STATUS", "campaign", campaign_id,
{"old": old_status, "new": new_status})
def log_campaign_delete(db, request, user, campaign_id, label):
log_action(db, request, user, "CAMPAIGN_DELETE", "campaign", campaign_id,
{"label": label})
def log_session_exclude(db, request, user, session_id, hostname, reason):
log_action(db, request, user, "SESSION_EXCLUDE", "patch_session", session_id,
{"hostname": hostname, "reason": reason})
def log_session_assign(db, request, user, session_id, hostname, operator):
log_action(db, request, user, "SESSION_ASSIGN", "patch_session", session_id,
{"hostname": hostname, "operator": operator})
def log_session_take(db, request, user, session_id, hostname):
log_action(db, request, user, "SESSION_TAKE", "patch_session", session_id,
{"hostname": hostname})
def log_session_release(db, request, user, session_id, hostname):
log_action(db, request, user, "SESSION_RELEASE", "patch_session", session_id,
{"hostname": hostname})
def log_server_edit(db, request, user, server_id, hostname, changes):
log_action(db, request, user, "SERVER_EDIT", "server", server_id,
{"hostname": hostname, "changes": changes})
def log_prereq_check(db, request, user, campaign_id, checked, excluded):
log_action(db, request, user, "PREREQ_CHECK", "campaign", campaign_id,
{"checked": checked, "auto_excluded": excluded})
def log_user_create(db, request, user, new_user_id, new_username):
log_action(db, request, user, "USER_CREATE", "user", new_user_id,
{"new_username": new_username})
def log_user_edit(db, request, user, target_user_id, changes):
log_action(db, request, user, "USER_EDIT", "user", target_user_id,
{"changes": changes})
def log_user_delete(db, request, user, target_user_id, username):
log_action(db, request, user, "USER_DELETE", "user", target_user_id,
{"deleted_username": username})
def log_user_toggle(db, request, user, target_user_id, new_state):
log_action(db, request, user, "USER_TOGGLE", "user", target_user_id,
{"active": new_state})
def log_permissions_change(db, request, user, target_user_id, perms):
log_action(db, request, user, "PERMISSIONS_CHANGE", "user", target_user_id,
{"permissions": perms})
def log_setting_change(db, request, user, section):
log_action(db, request, user, "SETTING_CHANGE", "settings", None,
{"section": section})
def log_planning_change(db, request, user, action_type, entry_id=None, details=None):
log_action(db, request, user, f"PLANNING_{action_type}", "planning", entry_id, details)
def log_qualys_sync(db, request, user, server_id, hostname, result):
log_action(db, request, user, "QUALYS_SYNC", "server", server_id,
{"hostname": hostname, "result": result})