patchcenter/app/services/server_audit_full_service.py
Khalid MOUTAOUAKIL 6834710af6 Fix CAST inet + IPv6 cleanup dans resolve_dest_server
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 16:23:32 +02:00

504 lines
21 KiB
Python

"""Service audit complet serveur — applicatif + reseau + correlation + carte flux
Adapte du standalone SANEF corrige pour PatchCenter (FastAPI/PostgreSQL)
"""
import json
import re
import os
import socket
import logging
from datetime import datetime
from sqlalchemy import text
logging.getLogger("paramiko").setLevel(logging.CRITICAL)
logging.getLogger("paramiko.transport").setLevel(logging.CRITICAL)
try:
import paramiko
PARAMIKO_OK = True
except ImportError:
PARAMIKO_OK = False
SSH_KEY_FILE = "/opt/patchcenter/keys/id_rsa_cybglobal.pem"
PSMP_HOST = "psmp.sanef.fr"
CYBR_USER = "CYBP01336"
TARGET_USER = "cybsecope"
SSH_TIMEOUT = 20
ENV_DOMAINS = {
"prod": ".sanef.groupe",
"preprod": ".sanef.groupe",
"recette": ".sanef-rec.fr",
"test": ".sanef-rec.fr",
"dev": ".sanef-rec.fr",
}
BANNER_FILTERS = [
"GROUPE SANEF", "propriete du Groupe", "accederait", "emprisonnement",
"Article 323", "code penal", "Authorized uses only", "CyberArk",
"This session", "session is being",
]
SCRIPT_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "scripts", "server_audit.sh")
def _load_script():
with open(SCRIPT_PATH, "r", encoding="utf-8") as f:
return f.read()
def _get_psmp_password(db=None):
if not db:
return None
try:
from .secrets_service import get_secret
return get_secret(db, "ssh_pwd_default_pass")
except Exception:
return None
# ── DETECTION ENV + SSH (pattern SANEF corrige) ──
def detect_env(hostname):
h = hostname.lower()
c = h[1] if len(h) > 1 else ""
if c == "p": return "prod"
elif c == "i": return "preprod"
elif c == "r": return "recette"
elif c == "v": return "test"
elif c == "d": return "dev"
return "recette"
def _load_key():
if not os.path.exists(SSH_KEY_FILE):
return None
for cls in [paramiko.RSAKey, paramiko.Ed25519Key, paramiko.ECDSAKey]:
try:
return cls.from_private_key_file(SSH_KEY_FILE)
except Exception:
continue
return None
def _build_fqdn_candidates(hostname):
if "." in hostname:
return [hostname]
c = hostname[1] if len(hostname) > 1 else ""
if c in ("p", "i"):
return [f"{hostname}.sanef.groupe", f"{hostname}.sanef-rec.fr", hostname]
else:
return [f"{hostname}.sanef-rec.fr", f"{hostname}.sanef.groupe", hostname]
def _try_psmp(fqdn, password):
if not password:
return None
try:
username = f"{CYBR_USER}@{TARGET_USER}@{fqdn}"
transport = paramiko.Transport((PSMP_HOST, 22))
transport.connect()
def handler(title, instructions, prompt_list):
return [password] * len(prompt_list)
transport.auth_interactive(username, handler)
client = paramiko.SSHClient()
client._transport = transport
return client
except Exception:
return None
def _try_key(fqdn, key):
if not key:
return None
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(fqdn, port=22, username=TARGET_USER, pkey=key,
timeout=SSH_TIMEOUT, look_for_keys=False, allow_agent=False)
return client
except Exception:
return None
def ssh_connect(hostname, password=None):
fqdn_candidates = _build_fqdn_candidates(hostname)
key = _load_key()
for fqdn in fqdn_candidates:
if password:
client = _try_psmp(fqdn, password)
if client:
return client, None
if key:
client = _try_key(fqdn, key)
if client:
return client, None
return None, f"Connexion impossible sur {fqdn_candidates}"
def ssh_run_script(client, script_content, timeout=300):
try:
chan = client._transport.open_session()
chan.settimeout(timeout)
chan.exec_command("bash -s")
chan.sendall(script_content.encode("utf-8"))
chan.shutdown_write()
out = b""
while True:
try:
chunk = chan.recv(8192)
if not chunk:
break
out += chunk
except Exception:
break
chan.close()
out_str = out.decode("utf-8", errors="replace")
if not out_str.strip():
return "", "Sortie vide"
lines = [l for l in out_str.splitlines() if not any(b in l for b in BANNER_FILTERS)]
return "\n".join(lines), None
except Exception as e:
return "", str(e)
# ── PARSING ──
def parse_audit_output(raw):
result = {
"hostname": "", "os_release": "", "kernel": "", "uptime": "",
"services": [], "processes": [], "services_failed": "",
"needs_restarting": "", "reboot_required": False, "disk_usage": [],
"interfaces": [], "routes": [], "listen_ports": [],
"connections": [], "flux_in": [], "flux_out": [],
"conn_wait": [], "net_stats": {}, "traffic": [],
"firewall": {"policy": {}, "input": [], "output": [], "firewalld": []},
"correlation_matrix": [], "outbound_only": [],
}
section = None
firewall_sub = None
for line in raw.splitlines():
ls = line.strip()
m = re.match(r"^# AUDIT COMPLET .+ (.+)$", ls)
if m: result["hostname"] = m.group(1); continue
m = re.match(r"^# OS: (.+)$", ls)
if m: result["os_release"] = m.group(1); continue
m = re.match(r"^# Kernel: (.+)$", ls)
if m: result["kernel"] = m.group(1); continue
m = re.match(r"^# Uptime: (.+)$", ls)
if m: result["uptime"] = m.group(1); continue
if "1.1 SERVICES APPLICATIFS" in ls: section = "services"; continue
elif "1.2 PROCESSUS APPLICATIFS" in ls: section = "processes"; continue
elif "1.3 SERVICES EN ECHEC" in ls: section = "services_failed"; continue
elif "1.4 NEEDS-RESTARTING" in ls: section = "needs_restarting"; continue
elif "1.5 ESPACE DISQUE" in ls: section = "disk"; continue
elif "2.1 INTERFACES" in ls: section = "interfaces"; continue
elif "2.2 TABLE DE ROUTAGE" in ls: section = "routes"; continue
elif "2.3 PORTS EN ECOUTE" in ls: section = "listen_ports"; continue
elif "2.4 CONNEXIONS ETABLIES" in ls: section = "connections"; continue
elif "2.5 RESUME FLUX ENTRANTS" in ls: section = "flux_in"; continue
elif "2.6 RESUME FLUX SORTANTS" in ls: section = "flux_out"; continue
elif "2.7 CONNEXIONS EN ATTENTE" in ls: section = "conn_wait"; continue
elif "2.8 STATISTIQUES" in ls: section = "net_stats"; continue
elif "2.9 TRAFIC" in ls: section = "traffic"; continue
elif "2.10 FIREWALL" in ls: section = "firewall"; firewall_sub = None; continue
elif "3.1 MATRICE" in ls: section = "correlation"; continue
elif "3.2 PROCESS SORTANTS" in ls: section = "outbound"; continue
elif ls.startswith("===") or ls.startswith("###"): section = None; continue
if not ls: continue
headers = ["SERVICE|","PID|PPID","PROTO|","DIRECTION|","PORT|","DEST_IP|",
"METRIC|","INTERFACE|","DESTINATION|","NUM|","PROCESS|USER",
"ZONE|","Mont","STATE|COUNT"]
if any(ls.startswith(h) for h in headers): continue
parts = ls.split("|")
if section == "services" and len(parts) >= 2:
result["services"].append({"name":parts[0],"enabled":parts[1],"pid":parts[2] if len(parts)>2 else "","user":parts[3] if len(parts)>3 else "","exec":parts[4] if len(parts)>4 else ""})
elif section == "processes" and len(parts) >= 6:
result["processes"].append({"pid":parts[0],"ppid":parts[1],"user":parts[2],"exe":parts[3],"cwd":parts[4],"cmdline":parts[5],"restart_hint":parts[6] if len(parts)>6 else ""})
elif section == "services_failed":
if ls != "Aucun service en echec": result["services_failed"] += ls + "\n"
elif section == "needs_restarting":
result["needs_restarting"] += ls + "\n"
if "EXIT_CODE=1" in ls: result["reboot_required"] = True
elif section == "disk":
p = ls.split()
if len(p) >= 5 and "%" in p[-1]:
try: result["disk_usage"].append({"mount":p[0],"size":p[1],"used":p[2],"avail":p[3],"pct":int(p[4].replace("%",""))})
except: pass
elif section == "interfaces" and len(parts) >= 3:
result["interfaces"].append({"iface":parts[0],"ip":parts[1],"mask":parts[2],"state":parts[3] if len(parts)>3 else "","mac":parts[4] if len(parts)>4 else ""})
elif section == "routes" and len(parts) >= 3:
result["routes"].append({"dest":parts[0],"gw":parts[1],"iface":parts[2],"metric":parts[3] if len(parts)>3 else ""})
elif section == "listen_ports" and len(parts) >= 3:
result["listen_ports"].append({"proto":parts[0],"addr_port":parts[1],"pid":parts[2],"process":parts[3] if len(parts)>3 else "","user":parts[4] if len(parts)>4 else "","service":parts[5] if len(parts)>5 else ""})
elif section == "connections" and len(parts) >= 5:
result["connections"].append({"direction":parts[0],"proto":parts[1],"local":parts[2],"remote":parts[3],"pid":parts[4],"process":parts[5] if len(parts)>5 else "","user":parts[6] if len(parts)>6 else "","state":parts[7] if len(parts)>7 else ""})
elif section == "flux_in" and len(parts) >= 3:
result["flux_in"].append({"port":parts[0],"service":parts[1],"process":parts[2],"count":parts[3] if len(parts)>3 else "0","sources":parts[4] if len(parts)>4 else ""})
elif section == "flux_out" and len(parts) >= 3:
result["flux_out"].append({"dest_ip":parts[0],"dest_port":parts[1],"service":parts[2],"process":parts[3] if len(parts)>3 else "","count":parts[4] if len(parts)>4 else "1"})
elif section == "conn_wait" and len(parts) == 2:
result["conn_wait"].append({"state":parts[0],"count":parts[1]})
elif section == "net_stats" and len(parts) == 2:
result["net_stats"][parts[0].strip()] = parts[1].strip()
elif section == "traffic" and len(parts) >= 5:
result["traffic"].append({"iface":parts[0],"rx_bytes":parts[1],"rx_pkt":parts[2],"rx_err":parts[3],"tx_bytes":parts[4],"tx_pkt":parts[5] if len(parts)>5 else "","tx_err":parts[6] if len(parts)>6 else ""})
elif section == "firewall":
if "POLICY" in ls: firewall_sub = "policy"; continue
elif "INPUT" in ls and "---" in ls: firewall_sub = "input"; continue
elif "OUTPUT" in ls and "---" in ls: firewall_sub = "output"; continue
elif "FIREWALLD" in ls: firewall_sub = "firewalld"; continue
if firewall_sub == "policy" and len(parts) == 2: result["firewall"]["policy"][parts[0]] = parts[1]
elif firewall_sub == "input" and len(parts) >= 3: result["firewall"]["input"].append(ls)
elif firewall_sub == "output" and len(parts) >= 3: result["firewall"]["output"].append(ls)
elif firewall_sub == "firewalld" and len(parts) >= 2: result["firewall"]["firewalld"].append({"zone":parts[0],"services":parts[1],"ports":parts[2] if len(parts)>2 else ""})
elif section == "correlation" and len(parts) >= 4:
result["correlation_matrix"].append({"process":parts[0],"user":parts[1],"pid":parts[2],"listen_ports":parts[3],"conn_in":parts[4] if len(parts)>4 else "0","conn_out":parts[5] if len(parts)>5 else "0","remote_dests":parts[6] if len(parts)>6 else ""})
elif section == "outbound" and len(parts) >= 3:
result["outbound_only"].append({"process":parts[0],"user":parts[1],"pid":parts[2],"dests":parts[3] if len(parts)>3 else ""})
result["services_failed"] = result["services_failed"].strip()
result["needs_restarting"] = result["needs_restarting"].strip()
return result
# ── STOCKAGE DB ──
def _resolve_server_id(db, hostname):
srv = db.execute(text(
"SELECT id FROM servers WHERE LOWER(hostname) = LOWER(:h)"
), {"h": hostname.split(".")[0]}).fetchone()
return srv.id if srv else None
def _resolve_dest_server(db, dest_ip):
# Nettoyer l'IP (retirer IPv6-mapped prefix, brackets)
clean_ip = dest_ip.replace("[::ffff:", "").replace("]", "").strip()
if not clean_ip or ":" in clean_ip:
return (None, None) # IPv6 pure, skip
try:
row = db.execute(text("""
SELECT s.id, s.hostname FROM servers s
JOIN server_ips si ON s.id = si.server_id
WHERE si.ip_address = CAST(:ip AS inet)
LIMIT 1
"""), {"ip": clean_ip}).fetchone()
return (row.id, row.hostname) if row else (None, None)
except Exception:
return (None, None)
def save_audit_to_db(db, parsed, raw_output="", status="ok", error_msg=None):
hostname = parsed.get("hostname", "")
if not hostname:
return None
server_id = _resolve_server_id(db, hostname)
row = db.execute(text("""
INSERT INTO server_audit_full (
server_id, hostname, audit_date, os_release, kernel, uptime,
services, processes, services_failed, needs_restarting, reboot_required,
disk_usage, interfaces, routes, listen_ports, connections,
flux_in, flux_out, conn_wait, net_stats, traffic, firewall,
correlation_matrix, outbound_only, raw_output, status, error_msg
) VALUES (
:sid, :hn, NOW(), :os, :k, :up,
:svc, :proc, :sf, :nr, :rr,
:du, :iface, :rt, :lp, :conn,
:fi, :fo, :cw, :ns, :tr, :fw,
:cm, :ob, :raw, :st, :err
) RETURNING id
"""), {
"sid": server_id, "hn": hostname,
"os": parsed.get("os_release", ""), "k": parsed.get("kernel", ""),
"up": parsed.get("uptime", ""),
"svc": json.dumps(parsed.get("services", [])),
"proc": json.dumps(parsed.get("processes", [])),
"sf": parsed.get("services_failed", ""),
"nr": parsed.get("needs_restarting", ""),
"rr": parsed.get("reboot_required", False),
"du": json.dumps(parsed.get("disk_usage", [])),
"iface": json.dumps(parsed.get("interfaces", [])),
"rt": json.dumps(parsed.get("routes", [])),
"lp": json.dumps(parsed.get("listen_ports", [])),
"conn": json.dumps(parsed.get("connections", [])),
"fi": json.dumps(parsed.get("flux_in", [])),
"fo": json.dumps(parsed.get("flux_out", [])),
"cw": json.dumps(parsed.get("conn_wait", [])),
"ns": json.dumps(parsed.get("net_stats", {})),
"tr": json.dumps(parsed.get("traffic", [])),
"fw": json.dumps(parsed.get("firewall", {})),
"cm": json.dumps(parsed.get("correlation_matrix", [])),
"ob": json.dumps(parsed.get("outbound_only", [])),
"raw": raw_output, "st": status, "err": error_msg,
}).fetchone()
audit_id = row.id
_build_flow_map(db, audit_id, hostname, server_id, parsed)
return audit_id
def _build_flow_map(db, audit_id, hostname, server_id, parsed):
local_ips = [i["ip"] for i in parsed.get("interfaces", []) if i["ip"] != "127.0.0.1"]
source_ip = local_ips[0] if local_ips else ""
for conn in parsed.get("connections", []):
remote = conn.get("remote", "")
m = re.match(r'^(.+):(\d+)$', remote)
if not m:
continue
dest_ip = m.group(1)
dest_port = int(m.group(2))
if dest_ip.startswith("127.") or dest_ip == "::1":
continue
dest_server_id, dest_hostname = _resolve_dest_server(db, dest_ip)
db.execute(text("""
INSERT INTO network_flow_map (
audit_id, source_server_id, source_hostname, source_ip,
dest_ip, dest_port, dest_hostname, dest_server_id,
process_name, process_user, direction,
connection_count, state, audit_date
) VALUES (
:aid, :ssid, :shn, :sip,
:dip, :dp, :dhn, :dsid,
:pn, :pu, :dir, 1, :st, NOW()
)
"""), {
"aid": audit_id, "ssid": server_id, "shn": hostname, "sip": source_ip,
"dip": dest_ip, "dp": dest_port, "dhn": dest_hostname, "dsid": dest_server_id,
"pn": conn.get("process", ""), "pu": conn.get("user", ""),
"dir": conn.get("direction", ""), "st": conn.get("state", ""),
})
# ── IMPORT JSON (depuis standalone) ──
def import_json_report(db, json_data):
servers = json_data.get("servers", [])
imported = 0
errors = 0
for srv in servers:
if srv.get("status") == "error":
errors += 1
continue
hostname = srv.get("hostname", "")
if not hostname:
continue
parsed = {k: srv.get(k, v) for k, v in {
"hostname": "", "os_release": "", "kernel": "", "uptime": "",
"services": [], "processes": [], "services_failed": "",
"needs_restarting": "", "reboot_required": False, "disk_usage": [],
"interfaces": [], "routes": [], "listen_ports": [],
"connections": [], "flux_in": [], "flux_out": [],
"conn_wait": [], "net_stats": {}, "traffic": [],
"firewall": {}, "correlation_matrix": [], "outbound_only": [],
}.items()}
save_audit_to_db(db, parsed)
imported += 1
db.commit()
return imported, errors
# ── REQUETES ──
def get_latest_audits(db, limit=100):
return db.execute(text("""
SELECT DISTINCT ON (hostname) id, server_id, hostname, audit_date,
os_release, kernel, uptime, status, reboot_required,
jsonb_array_length(COALESCE(services, '[]')) as svc_count,
jsonb_array_length(COALESCE(listen_ports, '[]')) as port_count,
jsonb_array_length(COALESCE(connections, '[]')) as conn_count,
jsonb_array_length(COALESCE(processes, '[]')) as proc_count
FROM server_audit_full
WHERE status = 'ok'
ORDER BY hostname, audit_date DESC
LIMIT :lim
"""), {"lim": limit}).fetchall()
def get_audit_detail(db, audit_id):
return db.execute(text(
"SELECT * FROM server_audit_full WHERE id = :id"
), {"id": audit_id}).fetchone()
def get_flow_map(db):
return db.execute(text("""
SELECT source_hostname, source_ip, dest_ip, dest_port,
dest_hostname, process_name, direction, state,
COUNT(*) as cnt
FROM network_flow_map nfm
JOIN server_audit_full saf ON nfm.audit_id = saf.id
WHERE saf.id IN (
SELECT DISTINCT ON (hostname) id FROM server_audit_full
WHERE status = 'ok' ORDER BY hostname, audit_date DESC
)
GROUP BY source_hostname, source_ip, dest_ip, dest_port,
dest_hostname, process_name, direction, state
ORDER BY source_hostname
""")).fetchall()
def get_flow_map_for_server(db, hostname):
return db.execute(text("""
SELECT source_hostname, source_ip, dest_ip, dest_port,
dest_hostname, process_name, direction, state
FROM network_flow_map
WHERE audit_id = (
SELECT id FROM server_audit_full WHERE hostname = :h
ORDER BY audit_date DESC LIMIT 1
)
ORDER BY direction DESC, dest_ip
"""), {"h": hostname}).fetchall()
def get_flow_map_for_domain(db, domain_code):
return db.execute(text("""
SELECT nfm.source_hostname, nfm.source_ip, nfm.dest_ip, nfm.dest_port,
nfm.dest_hostname, nfm.process_name, nfm.direction, nfm.state
FROM network_flow_map nfm
JOIN server_audit_full saf ON nfm.audit_id = saf.id
JOIN servers s ON saf.server_id = s.id
JOIN domain_environments de ON s.domain_env_id = de.id
JOIN domains d ON de.domain_id = d.id
WHERE d.code = :dc
AND saf.id IN (
SELECT DISTINCT ON (hostname) id FROM server_audit_full
WHERE status = 'ok' ORDER BY hostname, audit_date DESC
)
ORDER BY nfm.source_hostname
"""), {"dc": domain_code}).fetchall()
def get_app_map(db):
audits = db.execute(text("""
SELECT DISTINCT ON (hostname) hostname, server_id, processes, listen_ports
FROM server_audit_full WHERE status = 'ok'
ORDER BY hostname, audit_date DESC
""")).fetchall()
app_groups = {}
for audit in audits:
processes = audit.processes if isinstance(audit.processes, list) else json.loads(audit.processes or "[]")
for proc in processes:
cwd = proc.get("cwd", "")
m = re.search(r'/applis/([^/]+)', cwd)
if not m:
continue
app_name = m.group(1)
if app_name not in app_groups:
app_groups[app_name] = {"servers": [], "ports": set()}
if audit.hostname not in [s["hostname"] for s in app_groups[app_name]["servers"]]:
app_groups[app_name]["servers"].append({
"hostname": audit.hostname,
"server_id": audit.server_id,
"user": proc.get("user", ""),
"cmdline": proc.get("cmdline", "")[:100],
"restart_hint": proc.get("restart_hint", "")[:100],
})
listen = audit.listen_ports if isinstance(audit.listen_ports, list) else json.loads(audit.listen_ports or "[]")
pid = proc.get("pid", "")
for lp in listen:
if lp.get("pid") == pid:
app_groups[app_name]["ports"].add(lp.get("addr_port", ""))
for k in app_groups:
app_groups[k]["ports"] = list(app_groups[k]["ports"])
return app_groups