"""Service audit complet serveur — applicatif + reseau + correlation + carte flux Adapte du standalone SANEF corrige pour PatchCenter (FastAPI/PostgreSQL) """ import json import re import os import socket import logging from datetime import datetime from sqlalchemy import text logging.getLogger("paramiko").setLevel(logging.CRITICAL) logging.getLogger("paramiko.transport").setLevel(logging.CRITICAL) try: import paramiko PARAMIKO_OK = True except ImportError: PARAMIKO_OK = False SSH_KEY_FILE = "/opt/patchcenter/keys/id_rsa_cybglobal.pem" PSMP_HOST = "psmp.sanef.fr" CYBR_USER = "CYBP01336" TARGET_USER = "cybsecope" SSH_TIMEOUT = 20 ENV_DOMAINS = { "prod": ".sanef.groupe", "preprod": ".sanef.groupe", "recette": ".sanef-rec.fr", "test": ".sanef-rec.fr", "dev": ".sanef-rec.fr", } BANNER_FILTERS = [ "GROUPE SANEF", "propriete du Groupe", "accederait", "emprisonnement", "Article 323", "code penal", "Authorized uses only", "CyberArk", "This session", "session is being", ] SCRIPT_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "scripts", "server_audit.sh") def _load_script(): with open(SCRIPT_PATH, "r", encoding="utf-8") as f: return f.read() def _get_psmp_password(db=None): if not db: return None try: from .secrets_service import get_secret return get_secret(db, "ssh_pwd_default_pass") except Exception: return None # ── DETECTION ENV + SSH (pattern SANEF corrige) ── def detect_env(hostname): h = hostname.lower() c = h[1] if len(h) > 1 else "" if c == "p": return "prod" elif c == "i": return "preprod" elif c == "r": return "recette" elif c == "v": return "test" elif c == "d": return "dev" return "recette" def _load_key(): if not os.path.exists(SSH_KEY_FILE): return None for cls in [paramiko.RSAKey, paramiko.Ed25519Key, paramiko.ECDSAKey]: try: return cls.from_private_key_file(SSH_KEY_FILE) except Exception: continue return None def _build_fqdn_candidates(hostname): if "." in hostname: return [hostname] c = hostname[1] if len(hostname) > 1 else "" if c in ("p", "i"): return [f"{hostname}.sanef.groupe", f"{hostname}.sanef-rec.fr", hostname] else: return [f"{hostname}.sanef-rec.fr", f"{hostname}.sanef.groupe", hostname] def _try_psmp(fqdn, password): if not password: return None try: username = f"{CYBR_USER}@{TARGET_USER}@{fqdn}" transport = paramiko.Transport((PSMP_HOST, 22)) transport.connect() def handler(title, instructions, prompt_list): return [password] * len(prompt_list) transport.auth_interactive(username, handler) client = paramiko.SSHClient() client._transport = transport return client except Exception: return None def _try_key(fqdn, key): if not key: return None try: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(fqdn, port=22, username=TARGET_USER, pkey=key, timeout=SSH_TIMEOUT, look_for_keys=False, allow_agent=False) return client except Exception: return None def ssh_connect(hostname, password=None): fqdn_candidates = _build_fqdn_candidates(hostname) key = _load_key() for fqdn in fqdn_candidates: if password: client = _try_psmp(fqdn, password) if client: return client, None if key: client = _try_key(fqdn, key) if client: return client, None return None, f"Connexion impossible sur {fqdn_candidates}" def ssh_run_script(client, script_content, timeout=300): try: chan = client._transport.open_session() chan.settimeout(timeout) chan.exec_command("bash -s") chan.sendall(script_content.encode("utf-8")) chan.shutdown_write() out = b"" while True: try: chunk = chan.recv(8192) if not chunk: break out += chunk except Exception: break chan.close() out_str = out.decode("utf-8", errors="replace") if not out_str.strip(): return "", "Sortie vide" lines = [l for l in out_str.splitlines() if not any(b in l for b in BANNER_FILTERS)] return "\n".join(lines), None except Exception as e: return "", str(e) # ── PARSING ── def parse_audit_output(raw): result = { "hostname": "", "os_release": "", "kernel": "", "uptime": "", "services": [], "processes": [], "services_failed": "", "needs_restarting": "", "reboot_required": False, "disk_usage": [], "interfaces": [], "routes": [], "listen_ports": [], "connections": [], "flux_in": [], "flux_out": [], "conn_wait": [], "net_stats": {}, "traffic": [], "firewall": {"policy": {}, "input": [], "output": [], "firewalld": []}, "correlation_matrix": [], "outbound_only": [], } section = None firewall_sub = None for line in raw.splitlines(): ls = line.strip() m = re.match(r"^# AUDIT COMPLET .+ (.+)$", ls) if m: result["hostname"] = m.group(1); continue m = re.match(r"^# OS: (.+)$", ls) if m: result["os_release"] = m.group(1); continue m = re.match(r"^# Kernel: (.+)$", ls) if m: result["kernel"] = m.group(1); continue m = re.match(r"^# Uptime: (.+)$", ls) if m: result["uptime"] = m.group(1); continue if "1.1 SERVICES APPLICATIFS" in ls: section = "services"; continue elif "1.2 PROCESSUS APPLICATIFS" in ls: section = "processes"; continue elif "1.3 SERVICES EN ECHEC" in ls: section = "services_failed"; continue elif "1.4 NEEDS-RESTARTING" in ls: section = "needs_restarting"; continue elif "1.5 ESPACE DISQUE" in ls: section = "disk"; continue elif "2.1 INTERFACES" in ls: section = "interfaces"; continue elif "2.2 TABLE DE ROUTAGE" in ls: section = "routes"; continue elif "2.3 PORTS EN ECOUTE" in ls: section = "listen_ports"; continue elif "2.4 CONNEXIONS ETABLIES" in ls: section = "connections"; continue elif "2.5 RESUME FLUX ENTRANTS" in ls: section = "flux_in"; continue elif "2.6 RESUME FLUX SORTANTS" in ls: section = "flux_out"; continue elif "2.7 CONNEXIONS EN ATTENTE" in ls: section = "conn_wait"; continue elif "2.8 STATISTIQUES" in ls: section = "net_stats"; continue elif "2.9 TRAFIC" in ls: section = "traffic"; continue elif "2.10 FIREWALL" in ls: section = "firewall"; firewall_sub = None; continue elif "3.1 MATRICE" in ls: section = "correlation"; continue elif "3.2 PROCESS SORTANTS" in ls: section = "outbound"; continue elif ls.startswith("===") or ls.startswith("###"): section = None; continue if not ls: continue headers = ["SERVICE|","PID|PPID","PROTO|","DIRECTION|","PORT|","DEST_IP|", "METRIC|","INTERFACE|","DESTINATION|","NUM|","PROCESS|USER", "ZONE|","Mont","STATE|COUNT"] if any(ls.startswith(h) for h in headers): continue parts = ls.split("|") if section == "services" and len(parts) >= 2: result["services"].append({"name":parts[0],"enabled":parts[1],"pid":parts[2] if len(parts)>2 else "","user":parts[3] if len(parts)>3 else "","exec":parts[4] if len(parts)>4 else ""}) elif section == "processes" and len(parts) >= 6: result["processes"].append({"pid":parts[0],"ppid":parts[1],"user":parts[2],"exe":parts[3],"cwd":parts[4],"cmdline":parts[5],"restart_hint":parts[6] if len(parts)>6 else ""}) elif section == "services_failed": if ls != "Aucun service en echec": result["services_failed"] += ls + "\n" elif section == "needs_restarting": result["needs_restarting"] += ls + "\n" if "EXIT_CODE=1" in ls: result["reboot_required"] = True elif section == "disk": p = ls.split() if len(p) >= 5 and "%" in p[-1]: try: result["disk_usage"].append({"mount":p[0],"size":p[1],"used":p[2],"avail":p[3],"pct":int(p[4].replace("%",""))}) except: pass elif section == "interfaces" and len(parts) >= 3: result["interfaces"].append({"iface":parts[0],"ip":parts[1],"mask":parts[2],"state":parts[3] if len(parts)>3 else "","mac":parts[4] if len(parts)>4 else ""}) elif section == "routes" and len(parts) >= 3: result["routes"].append({"dest":parts[0],"gw":parts[1],"iface":parts[2],"metric":parts[3] if len(parts)>3 else ""}) elif section == "listen_ports" and len(parts) >= 3: result["listen_ports"].append({"proto":parts[0],"addr_port":parts[1],"pid":parts[2],"process":parts[3] if len(parts)>3 else "","user":parts[4] if len(parts)>4 else "","service":parts[5] if len(parts)>5 else ""}) elif section == "connections" and len(parts) >= 5: result["connections"].append({"direction":parts[0],"proto":parts[1],"local":parts[2],"remote":parts[3],"pid":parts[4],"process":parts[5] if len(parts)>5 else "","user":parts[6] if len(parts)>6 else "","state":parts[7] if len(parts)>7 else ""}) elif section == "flux_in" and len(parts) >= 3: result["flux_in"].append({"port":parts[0],"service":parts[1],"process":parts[2],"count":parts[3] if len(parts)>3 else "0","sources":parts[4] if len(parts)>4 else ""}) elif section == "flux_out" and len(parts) >= 3: result["flux_out"].append({"dest_ip":parts[0],"dest_port":parts[1],"service":parts[2],"process":parts[3] if len(parts)>3 else "","count":parts[4] if len(parts)>4 else "1"}) elif section == "conn_wait" and len(parts) == 2: result["conn_wait"].append({"state":parts[0],"count":parts[1]}) elif section == "net_stats" and len(parts) == 2: result["net_stats"][parts[0].strip()] = parts[1].strip() elif section == "traffic" and len(parts) >= 5: result["traffic"].append({"iface":parts[0],"rx_bytes":parts[1],"rx_pkt":parts[2],"rx_err":parts[3],"tx_bytes":parts[4],"tx_pkt":parts[5] if len(parts)>5 else "","tx_err":parts[6] if len(parts)>6 else ""}) elif section == "firewall": if "POLICY" in ls: firewall_sub = "policy"; continue elif "INPUT" in ls and "---" in ls: firewall_sub = "input"; continue elif "OUTPUT" in ls and "---" in ls: firewall_sub = "output"; continue elif "FIREWALLD" in ls: firewall_sub = "firewalld"; continue if firewall_sub == "policy" and len(parts) == 2: result["firewall"]["policy"][parts[0]] = parts[1] elif firewall_sub == "input" and len(parts) >= 3: result["firewall"]["input"].append(ls) elif firewall_sub == "output" and len(parts) >= 3: result["firewall"]["output"].append(ls) elif firewall_sub == "firewalld" and len(parts) >= 2: result["firewall"]["firewalld"].append({"zone":parts[0],"services":parts[1],"ports":parts[2] if len(parts)>2 else ""}) elif section == "correlation" and len(parts) >= 4: result["correlation_matrix"].append({"process":parts[0],"user":parts[1],"pid":parts[2],"listen_ports":parts[3],"conn_in":parts[4] if len(parts)>4 else "0","conn_out":parts[5] if len(parts)>5 else "0","remote_dests":parts[6] if len(parts)>6 else ""}) elif section == "outbound" and len(parts) >= 3: result["outbound_only"].append({"process":parts[0],"user":parts[1],"pid":parts[2],"dests":parts[3] if len(parts)>3 else ""}) result["services_failed"] = result["services_failed"].strip() result["needs_restarting"] = result["needs_restarting"].strip() return result # ── STOCKAGE DB ── def _resolve_server_id(db, hostname): srv = db.execute(text( "SELECT id FROM servers WHERE LOWER(hostname) = LOWER(:h)" ), {"h": hostname.split(".")[0]}).fetchone() return srv.id if srv else None def _resolve_dest_server(db, dest_ip): # Nettoyer l'IP (retirer IPv6-mapped prefix, brackets) clean_ip = dest_ip.replace("[::ffff:", "").replace("]", "").strip() if not clean_ip or ":" in clean_ip: return (None, None) # IPv6 pure, skip try: row = db.execute(text(""" SELECT s.id, s.hostname FROM servers s JOIN server_ips si ON s.id = si.server_id WHERE si.ip_address = CAST(:ip AS inet) LIMIT 1 """), {"ip": clean_ip}).fetchone() return (row.id, row.hostname) if row else (None, None) except Exception: return (None, None) def save_audit_to_db(db, parsed, raw_output="", status="ok", error_msg=None): hostname = parsed.get("hostname", "") if not hostname: return None server_id = _resolve_server_id(db, hostname) row = db.execute(text(""" INSERT INTO server_audit_full ( server_id, hostname, audit_date, os_release, kernel, uptime, services, processes, services_failed, needs_restarting, reboot_required, disk_usage, interfaces, routes, listen_ports, connections, flux_in, flux_out, conn_wait, net_stats, traffic, firewall, correlation_matrix, outbound_only, raw_output, status, error_msg ) VALUES ( :sid, :hn, NOW(), :os, :k, :up, :svc, :proc, :sf, :nr, :rr, :du, :iface, :rt, :lp, :conn, :fi, :fo, :cw, :ns, :tr, :fw, :cm, :ob, :raw, :st, :err ) RETURNING id """), { "sid": server_id, "hn": hostname, "os": parsed.get("os_release", ""), "k": parsed.get("kernel", ""), "up": parsed.get("uptime", ""), "svc": json.dumps(parsed.get("services", [])), "proc": json.dumps(parsed.get("processes", [])), "sf": parsed.get("services_failed", ""), "nr": parsed.get("needs_restarting", ""), "rr": parsed.get("reboot_required", False), "du": json.dumps(parsed.get("disk_usage", [])), "iface": json.dumps(parsed.get("interfaces", [])), "rt": json.dumps(parsed.get("routes", [])), "lp": json.dumps(parsed.get("listen_ports", [])), "conn": json.dumps(parsed.get("connections", [])), "fi": json.dumps(parsed.get("flux_in", [])), "fo": json.dumps(parsed.get("flux_out", [])), "cw": json.dumps(parsed.get("conn_wait", [])), "ns": json.dumps(parsed.get("net_stats", {})), "tr": json.dumps(parsed.get("traffic", [])), "fw": json.dumps(parsed.get("firewall", {})), "cm": json.dumps(parsed.get("correlation_matrix", [])), "ob": json.dumps(parsed.get("outbound_only", [])), "raw": raw_output, "st": status, "err": error_msg, }).fetchone() audit_id = row.id _build_flow_map(db, audit_id, hostname, server_id, parsed) return audit_id def _build_flow_map(db, audit_id, hostname, server_id, parsed): local_ips = [i["ip"] for i in parsed.get("interfaces", []) if i["ip"] != "127.0.0.1"] source_ip = local_ips[0] if local_ips else "" for conn in parsed.get("connections", []): remote = conn.get("remote", "") m = re.match(r'^(.+):(\d+)$', remote) if not m: continue dest_ip = m.group(1) dest_port = int(m.group(2)) if dest_ip.startswith("127.") or dest_ip == "::1": continue dest_server_id, dest_hostname = _resolve_dest_server(db, dest_ip) db.execute(text(""" INSERT INTO network_flow_map ( audit_id, source_server_id, source_hostname, source_ip, dest_ip, dest_port, dest_hostname, dest_server_id, process_name, process_user, direction, connection_count, state, audit_date ) VALUES ( :aid, :ssid, :shn, :sip, :dip, :dp, :dhn, :dsid, :pn, :pu, :dir, 1, :st, NOW() ) """), { "aid": audit_id, "ssid": server_id, "shn": hostname, "sip": source_ip, "dip": dest_ip, "dp": dest_port, "dhn": dest_hostname, "dsid": dest_server_id, "pn": conn.get("process", ""), "pu": conn.get("user", ""), "dir": conn.get("direction", ""), "st": conn.get("state", ""), }) # ── IMPORT JSON (depuis standalone) ── def import_json_report(db, json_data): servers = json_data.get("servers", []) imported = 0 errors = 0 for srv in servers: if srv.get("status") == "error": errors += 1 continue hostname = srv.get("hostname", "") if not hostname: continue parsed = {k: srv.get(k, v) for k, v in { "hostname": "", "os_release": "", "kernel": "", "uptime": "", "services": [], "processes": [], "services_failed": "", "needs_restarting": "", "reboot_required": False, "disk_usage": [], "interfaces": [], "routes": [], "listen_ports": [], "connections": [], "flux_in": [], "flux_out": [], "conn_wait": [], "net_stats": {}, "traffic": [], "firewall": {}, "correlation_matrix": [], "outbound_only": [], }.items()} save_audit_to_db(db, parsed) imported += 1 db.commit() return imported, errors # ── REQUETES ── def get_latest_audits(db, limit=100): return db.execute(text(""" SELECT DISTINCT ON (hostname) id, server_id, hostname, audit_date, os_release, kernel, uptime, status, reboot_required, last_patch_date, last_patch_week, last_patch_year, jsonb_array_length(COALESCE(services, '[]')) as svc_count, jsonb_array_length(COALESCE(listen_ports, '[]')) as port_count, jsonb_array_length(COALESCE(connections, '[]')) as conn_count, jsonb_array_length(COALESCE(processes, '[]')) as proc_count FROM server_audit_full WHERE status = 'ok' ORDER BY hostname, audit_date DESC LIMIT :lim """), {"lim": limit}).fetchall() def get_audit_detail(db, audit_id): return db.execute(text( "SELECT * FROM server_audit_full WHERE id = :id" ), {"id": audit_id}).fetchone() def get_flow_map(db): return db.execute(text(""" SELECT source_hostname, source_ip, dest_ip, dest_port, dest_hostname, process_name, direction, state, COUNT(*) as cnt FROM network_flow_map nfm JOIN server_audit_full saf ON nfm.audit_id = saf.id WHERE saf.id IN ( SELECT DISTINCT ON (hostname) id FROM server_audit_full WHERE status = 'ok' ORDER BY hostname, audit_date DESC ) GROUP BY source_hostname, source_ip, dest_ip, dest_port, dest_hostname, process_name, direction, state ORDER BY source_hostname """)).fetchall() def get_flow_map_for_server(db, hostname): return db.execute(text(""" SELECT source_hostname, source_ip, dest_ip, dest_port, dest_hostname, process_name, direction, state FROM network_flow_map WHERE audit_id = ( SELECT id FROM server_audit_full WHERE hostname = :h ORDER BY audit_date DESC LIMIT 1 ) ORDER BY direction DESC, dest_ip """), {"h": hostname}).fetchall() def get_flow_map_for_domain(db, domain_code): return db.execute(text(""" SELECT nfm.source_hostname, nfm.source_ip, nfm.dest_ip, nfm.dest_port, nfm.dest_hostname, nfm.process_name, nfm.direction, nfm.state FROM network_flow_map nfm JOIN server_audit_full saf ON nfm.audit_id = saf.id JOIN servers s ON saf.server_id = s.id JOIN domain_environments de ON s.domain_env_id = de.id JOIN domains d ON de.domain_id = d.id WHERE d.code = :dc AND saf.id IN ( SELECT DISTINCT ON (hostname) id FROM server_audit_full WHERE status = 'ok' ORDER BY hostname, audit_date DESC ) ORDER BY nfm.source_hostname """), {"dc": domain_code}).fetchall() def get_app_map(db): audits = db.execute(text(""" SELECT DISTINCT ON (hostname) hostname, server_id, processes, listen_ports FROM server_audit_full WHERE status = 'ok' ORDER BY hostname, audit_date DESC """)).fetchall() app_groups = {} for audit in audits: processes = audit.processes if isinstance(audit.processes, list) else json.loads(audit.processes or "[]") for proc in processes: cwd = proc.get("cwd", "") m = re.search(r'/applis/([^/]+)', cwd) if not m: continue app_name = m.group(1) if app_name not in app_groups: app_groups[app_name] = {"servers": [], "ports": set()} if audit.hostname not in [s["hostname"] for s in app_groups[app_name]["servers"]]: app_groups[app_name]["servers"].append({ "hostname": audit.hostname, "server_id": audit.server_id, "user": proc.get("user", ""), "cmdline": proc.get("cmdline", "")[:100], "restart_hint": proc.get("restart_hint", "")[:100], }) listen = audit.listen_ports if isinstance(audit.listen_ports, list) else json.loads(audit.listen_ports or "[]") pid = proc.get("pid", "") for lp in listen: if lp.get("pid") == pid: app_groups[app_name]["ports"].add(lp.get("addr_port", "")) for k in app_groups: app_groups[k]["ports"] = list(app_groups[k]["ports"]) return app_groups