"""Service audit — log centralise de toutes les actions pour Splunk""" import json import logging from datetime import datetime from fastapi import Request from sqlalchemy import text logger = logging.getLogger("patchcenter.audit") # Format JSON structure pour Splunk (une ligne par event) LOG_FORMAT = '%(message)s' import os log_path = os.getenv("AUDIT_LOG_PATH", "logs/patchcenter_audit.json") os.makedirs(os.path.dirname(log_path) or ".", exist_ok=True) handler = logging.FileHandler(log_path) handler.setFormatter(logging.Formatter(LOG_FORMAT)) logger.addHandler(handler) logger.setLevel(logging.INFO) def log_action(db, request: Request, user: dict, action: str, entity_type: str = None, entity_id: int = None, details: dict = None): """Log une action dans la base ET dans le fichier JSON pour Splunk""" username = user.get("sub", "system") if user else "system" uid = user.get("uid") if user else None ip = _get_client_ip(request) if request else None # Insert en base db.execute(text(""" INSERT INTO audit_log (user_id, username, action, entity_type, entity_id, details, ip_address) VALUES (:uid, :un, :action, :et, :eid, :details, :ip) """), { "uid": uid, "un": username, "action": action, "et": entity_type, "eid": entity_id, "details": json.dumps(details) if details else None, "ip": ip, }) # Log fichier JSON (Splunk-ready) event = { "timestamp": datetime.utcnow().isoformat() + "Z", "app": "patchcenter", "action": action, "username": username, "user_id": uid, "entity_type": entity_type, "entity_id": entity_id, "ip": ip, "details": details, } logger.info(json.dumps(event, ensure_ascii=False)) def _get_client_ip(request: Request): """Extrait l'IP client (supporte X-Forwarded-For derriere nginx)""" forwarded = request.headers.get("X-Forwarded-For") if forwarded: return forwarded.split(",")[0].strip() if request.client: return request.client.host return None # === Actions predefinies === def log_login(db, request, user): log_action(db, request, user, "LOGIN", "user", user.get("uid")) def log_logout(db, request, user): log_action(db, request, user, "LOGOUT", "user", user.get("uid")) def log_login_failed(db, request, username): log_action(db, request, None, "LOGIN_FAILED", "user", None, {"username": username}) def log_campaign_create(db, request, user, campaign_id, label): log_action(db, request, user, "CAMPAIGN_CREATE", "campaign", campaign_id, {"label": label}) def log_campaign_status(db, request, user, campaign_id, old_status, new_status): log_action(db, request, user, "CAMPAIGN_STATUS", "campaign", campaign_id, {"old": old_status, "new": new_status}) def log_campaign_delete(db, request, user, campaign_id, label): log_action(db, request, user, "CAMPAIGN_DELETE", "campaign", campaign_id, {"label": label}) def log_session_exclude(db, request, user, session_id, hostname, reason): log_action(db, request, user, "SESSION_EXCLUDE", "patch_session", session_id, {"hostname": hostname, "reason": reason}) def log_session_assign(db, request, user, session_id, hostname, operator): log_action(db, request, user, "SESSION_ASSIGN", "patch_session", session_id, {"hostname": hostname, "operator": operator}) def log_session_take(db, request, user, session_id, hostname): log_action(db, request, user, "SESSION_TAKE", "patch_session", session_id, {"hostname": hostname}) def log_session_release(db, request, user, session_id, hostname): log_action(db, request, user, "SESSION_RELEASE", "patch_session", session_id, {"hostname": hostname}) def log_server_edit(db, request, user, server_id, hostname, changes): log_action(db, request, user, "SERVER_EDIT", "server", server_id, {"hostname": hostname, "changes": changes}) def log_prereq_check(db, request, user, campaign_id, checked, excluded): log_action(db, request, user, "PREREQ_CHECK", "campaign", campaign_id, {"checked": checked, "auto_excluded": excluded}) def log_user_create(db, request, user, new_user_id, new_username): log_action(db, request, user, "USER_CREATE", "user", new_user_id, {"new_username": new_username}) def log_user_edit(db, request, user, target_user_id, changes): log_action(db, request, user, "USER_EDIT", "user", target_user_id, {"changes": changes}) def log_user_delete(db, request, user, target_user_id, username): log_action(db, request, user, "USER_DELETE", "user", target_user_id, {"deleted_username": username}) def log_user_toggle(db, request, user, target_user_id, new_state): log_action(db, request, user, "USER_TOGGLE", "user", target_user_id, {"active": new_state}) def log_permissions_change(db, request, user, target_user_id, perms): log_action(db, request, user, "PERMISSIONS_CHANGE", "user", target_user_id, {"permissions": perms}) def log_setting_change(db, request, user, section): log_action(db, request, user, "SETTING_CHANGE", "settings", None, {"section": section}) def log_planning_change(db, request, user, action_type, entry_id=None, details=None): log_action(db, request, user, f"PLANNING_{action_type}", "planning", entry_id, details) def log_qualys_sync(db, request, user, server_id, hostname, result): log_action(db, request, user, "QUALYS_SYNC", "server", server_id, {"hostname": hostname, "result": result})