- Permissions 100% depuis user_permissions (plus de hardcode) - Middleware injecte perms dans chaque requête - Créneaux auto: 09h-12h30 / 14h-16h45, pas 15min, hprod lun-mar, prod mer-jeu - Assignations par défaut: par domaine, app_type, zone, serveur (table default_assignments) - Auto-liaison app_group: même intervenant recette+prod - Audit Splunk: /var/log/patchcenter_audit.json (JSON one-line par event) - Login/logout/campagnes/prereqs loggés en base + fichier - Page erreur maintenance (500/404) avec contact SecOps - Accents français dans toute lUI - Operator affiché comme Intervenant - Session 1h, redirect / vers dashboard si connecté - Demo mode prereqs (DEMO_MODE=True) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
140 lines
5.5 KiB
Python
140 lines
5.5 KiB
Python
"""Service audit — log centralise de toutes les actions pour Splunk"""
|
|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from fastapi import Request
|
|
from sqlalchemy import text
|
|
|
|
logger = logging.getLogger("patchcenter.audit")
|
|
|
|
# Format JSON structure pour Splunk (une ligne par event)
|
|
LOG_FORMAT = '%(message)s'
|
|
handler = logging.FileHandler("/var/log/patchcenter_audit.json")
|
|
handler.setFormatter(logging.Formatter(LOG_FORMAT))
|
|
logger.addHandler(handler)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
|
|
def log_action(db, request: Request, user: dict, action: str,
|
|
entity_type: str = None, entity_id: int = None,
|
|
details: dict = None):
|
|
"""Log une action dans la base ET dans le fichier JSON pour Splunk"""
|
|
username = user.get("sub", "system") if user else "system"
|
|
uid = user.get("uid") if user else None
|
|
ip = _get_client_ip(request) if request else None
|
|
|
|
# Insert en base
|
|
db.execute(text("""
|
|
INSERT INTO audit_log (user_id, username, action, entity_type, entity_id, details, ip_address)
|
|
VALUES (:uid, :un, :action, :et, :eid, :details, :ip)
|
|
"""), {
|
|
"uid": uid, "un": username, "action": action,
|
|
"et": entity_type, "eid": entity_id,
|
|
"details": json.dumps(details) if details else None,
|
|
"ip": ip,
|
|
})
|
|
|
|
# Log fichier JSON (Splunk-ready)
|
|
event = {
|
|
"timestamp": datetime.utcnow().isoformat() + "Z",
|
|
"app": "patchcenter",
|
|
"action": action,
|
|
"username": username,
|
|
"user_id": uid,
|
|
"entity_type": entity_type,
|
|
"entity_id": entity_id,
|
|
"ip": ip,
|
|
"details": details,
|
|
}
|
|
logger.info(json.dumps(event, ensure_ascii=False))
|
|
|
|
|
|
def _get_client_ip(request: Request):
|
|
"""Extrait l'IP client (supporte X-Forwarded-For derriere nginx)"""
|
|
forwarded = request.headers.get("X-Forwarded-For")
|
|
if forwarded:
|
|
return forwarded.split(",")[0].strip()
|
|
if request.client:
|
|
return request.client.host
|
|
return None
|
|
|
|
|
|
# === Actions predefinies ===
|
|
|
|
def log_login(db, request, user):
|
|
log_action(db, request, user, "LOGIN", "user", user.get("uid"))
|
|
|
|
def log_logout(db, request, user):
|
|
log_action(db, request, user, "LOGOUT", "user", user.get("uid"))
|
|
|
|
def log_login_failed(db, request, username):
|
|
log_action(db, request, None, "LOGIN_FAILED", "user", None,
|
|
{"username": username})
|
|
|
|
def log_campaign_create(db, request, user, campaign_id, label):
|
|
log_action(db, request, user, "CAMPAIGN_CREATE", "campaign", campaign_id,
|
|
{"label": label})
|
|
|
|
def log_campaign_status(db, request, user, campaign_id, old_status, new_status):
|
|
log_action(db, request, user, "CAMPAIGN_STATUS", "campaign", campaign_id,
|
|
{"old": old_status, "new": new_status})
|
|
|
|
def log_campaign_delete(db, request, user, campaign_id, label):
|
|
log_action(db, request, user, "CAMPAIGN_DELETE", "campaign", campaign_id,
|
|
{"label": label})
|
|
|
|
def log_session_exclude(db, request, user, session_id, hostname, reason):
|
|
log_action(db, request, user, "SESSION_EXCLUDE", "patch_session", session_id,
|
|
{"hostname": hostname, "reason": reason})
|
|
|
|
def log_session_assign(db, request, user, session_id, hostname, operator):
|
|
log_action(db, request, user, "SESSION_ASSIGN", "patch_session", session_id,
|
|
{"hostname": hostname, "operator": operator})
|
|
|
|
def log_session_take(db, request, user, session_id, hostname):
|
|
log_action(db, request, user, "SESSION_TAKE", "patch_session", session_id,
|
|
{"hostname": hostname})
|
|
|
|
def log_session_release(db, request, user, session_id, hostname):
|
|
log_action(db, request, user, "SESSION_RELEASE", "patch_session", session_id,
|
|
{"hostname": hostname})
|
|
|
|
def log_server_edit(db, request, user, server_id, hostname, changes):
|
|
log_action(db, request, user, "SERVER_EDIT", "server", server_id,
|
|
{"hostname": hostname, "changes": changes})
|
|
|
|
def log_prereq_check(db, request, user, campaign_id, checked, excluded):
|
|
log_action(db, request, user, "PREREQ_CHECK", "campaign", campaign_id,
|
|
{"checked": checked, "auto_excluded": excluded})
|
|
|
|
def log_user_create(db, request, user, new_user_id, new_username):
|
|
log_action(db, request, user, "USER_CREATE", "user", new_user_id,
|
|
{"new_username": new_username})
|
|
|
|
def log_user_edit(db, request, user, target_user_id, changes):
|
|
log_action(db, request, user, "USER_EDIT", "user", target_user_id,
|
|
{"changes": changes})
|
|
|
|
def log_user_delete(db, request, user, target_user_id, username):
|
|
log_action(db, request, user, "USER_DELETE", "user", target_user_id,
|
|
{"deleted_username": username})
|
|
|
|
def log_user_toggle(db, request, user, target_user_id, new_state):
|
|
log_action(db, request, user, "USER_TOGGLE", "user", target_user_id,
|
|
{"active": new_state})
|
|
|
|
def log_permissions_change(db, request, user, target_user_id, perms):
|
|
log_action(db, request, user, "PERMISSIONS_CHANGE", "user", target_user_id,
|
|
{"permissions": perms})
|
|
|
|
def log_setting_change(db, request, user, section):
|
|
log_action(db, request, user, "SETTING_CHANGE", "settings", None,
|
|
{"section": section})
|
|
|
|
def log_planning_change(db, request, user, action_type, entry_id=None, details=None):
|
|
log_action(db, request, user, f"PLANNING_{action_type}", "planning", entry_id, details)
|
|
|
|
def log_qualys_sync(db, request, user, server_id, hostname, result):
|
|
log_action(db, request, user, "QUALYS_SYNC", "server", server_id,
|
|
{"hostname": hostname, "result": result})
|