patchcenter/app/services/realtime_audit_service.py

553 lines
23 KiB
Python

"""Service audit temps reel — lance des checks SSH et retourne les resultats"""
import socket
import json
import re
from datetime import datetime
from sqlalchemy import text
try:
import paramiko
PARAMIKO_OK = True
except ImportError:
PARAMIKO_OK = False
SSH_KEY_DEFAULT = "/opt/patchcenter/keys/id_ed25519"
SSH_USER_DEFAULT = "root"
SSH_TIMEOUT = 12
DNS_SUFFIXES_DEFAULT = ["", ".mpcz.fr", ".sanef.groupe", ".sanef-rec.fr", ".sanef.fr"]
def _get_dns_suffixes():
try:
from .secrets_service import get_secret
from ..database import SessionLocal
db = SessionLocal()
val = get_secret(db, "ssh_dns_suffixes")
db.close()
if val:
return [x.strip() for x in val.split(",") if x.strip() or x == ""]
except Exception:
pass
return DNS_SUFFIXES_DEFAULT
def _get_ssh_settings():
"""Lit les settings SSH depuis app_secrets dans la DB.
Retourne (key_material, user). key_material peut etre un chemin (legacy)
ou le contenu PEM (nouveau)."""
try:
from .secrets_service import get_secret
from ..database import SessionLocal
db = SessionLocal()
# Nouveau: contenu PEM direct
key_material = get_secret(db, "ssh_key_private_key")
if not key_material:
key_material = get_secret(db, "ssh_key_file") or SSH_KEY_DEFAULT
user = get_secret(db, "ssh_key_default_user") or get_secret(db, "ssh_user") or SSH_USER_DEFAULT
db.close()
return key_material, user
except Exception:
return SSH_KEY_DEFAULT, SSH_USER_DEFAULT
# Commandes d'audit (simplifiees pour le temps reel)
AUDIT_CMDS = {
"os_release": "cat /etc/redhat-release 2>/dev/null || grep '^PRETTY_NAME=' /etc/os-release 2>/dev/null | cut -d'\"' -f2",
"kernel": "uname -r",
"uptime": "uptime -p 2>/dev/null || uptime",
"selinux": "getenforce 2>/dev/null || echo N/A",
"disk_space": "df -h --output=target,size,avail,pcent 2>/dev/null | grep -vE '^(tmpfs|devtmpfs|Filesystem)' | sort",
"apps_installed": "rpm -qa --qf '%{NAME} %{VERSION}\\n' 2>/dev/null | grep -iE 'tomcat|java|jdk|nginx|httpd|haproxy|docker|podman|postgresql|postgres|mysql|mariadb|mongodb|oracle|redis|elasticsearch|splunk|centreon|qualys' | sort -u",
"services_running": "systemctl list-units --type=service --state=running --no-pager --no-legend 2>/dev/null | grep -vE '(auditd|chronyd|crond|dbus|firewalld|getty|irqbalance|kdump|lvm2|NetworkManager|polkit|postfix|rsyslog|sshd|sssd|systemd|tuned|user@)' | awk '{print $1}' | sed 's/.service//' | sort",
"running_not_enabled": "comm -23 <(systemctl list-units --type=service --state=running --no-pager --no-legend 2>/dev/null | grep -vE '(auditd|chronyd|crond|dbus|firewalld|getty|irqbalance|kdump|lvm2|NetworkManager|polkit|postfix|rsyslog|sshd|sssd|systemd|tuned|user@)' | awk '{print $1}' | sed 's/.service//' | sort) <(systemctl list-unit-files --type=service --state=enabled --no-pager --no-legend 2>/dev/null | awk '{print $1}' | sed 's/.service//' | sort) 2>/dev/null || echo none",
"listening_ports": "ss -tlnp 2>/dev/null | grep LISTEN | grep -vE ':22 |:111 |:323 ' | awk '{print $4, $6}' | sort",
"db_detect": "for svc in postgresql mariadbd mysqld mongod redis-server; do state=$(systemctl is-active $svc 2>/dev/null); [ \"$state\" = \"active\" ] && echo \"$svc:active\"; done; pgrep -x ora_pmon >/dev/null 2>&1 && echo 'oracle:active' || true",
"cluster_detect": "(which pcs 2>/dev/null && pcs status 2>/dev/null | head -3) || (test -f /etc/corosync/corosync.conf && echo 'corosync:present') || echo 'no_cluster'",
"containers": "if which podman >/dev/null 2>&1; then USERS=$(ps aux 2>/dev/null | grep -E 'conmon|podman' | grep -v grep | awk '{print $1}' | sort -u); for U in $USERS; do echo \"=== podman@$U ===\"; su - $U -c 'podman ps -a --format \"table {{.Names}} {{.Status}}\"' 2>/dev/null; done; fi; if which docker >/dev/null 2>&1; then docker ps -a --format 'table {{.Names}} {{.Status}}' 2>/dev/null; fi",
"agents": "for svc in qualys-cloud-agent sentinelone zabbix-agent; do state=$(systemctl is-active $svc 2>/dev/null); [ \"$state\" = \"active\" ] && echo \"$svc:$state\"; done",
"failed_services": "systemctl list-units --type=service --state=failed --no-pager --no-legend 2>/dev/null | awk '{print $2}' | head -10 || echo none",
"satellite": "subscription-manager identity 2>/dev/null | grep -i 'org\\|server' || echo 'not_registered'",
}
BANNER_FILTERS = [
"GROUPE SANEF", "propriété du Groupe", "accèderait", "emprisonnement",
"Article 323", "code pénal", "Authorized uses only", "CyberArk",
"This session", "session is being",
]
def _ordered_suffixes(hostname):
"""Ordre des suffixes selon la 2e lettre du hostname (convention SANEF).
r=recette, p=prod, i=infra. Les autres suffixes sont tentés en fallback."""
all_suffixes = _get_dns_suffixes()
second = hostname[1].lower() if len(hostname) > 1 else ""
if second == "r":
priority = [".sanef-rec.fr", ".sanef.groupe", ".sanef.fr"]
elif second in ("p", "i"):
priority = [".sanef.groupe", ".sanef-rec.fr", ".sanef.fr"]
else:
priority = [".sanef.groupe", ".sanef-rec.fr", ".sanef.fr"]
ordered = []
for suf in priority:
if suf in all_suffixes and suf not in ordered:
ordered.append(suf)
for suf in all_suffixes:
if suf not in ordered:
ordered.append(suf)
return ordered
def _resolve(hostname):
# 1. FQDN stocke en base - retour direct sans check port (rapide)
try:
from ..database import SessionLocal
db = SessionLocal()
row = db.execute(text(
"SELECT fqdn FROM servers WHERE LOWER(hostname)=LOWER(:h) "
"AND fqdn IS NOT NULL AND fqdn != ''"
), {"h": hostname}).fetchone()
db.close()
if row and row.fqdn:
return row.fqdn
except Exception:
pass
# 2. Fallback : boucle suffixes DNS (si FQDN manquant en base)
for suffix in _ordered_suffixes(hostname):
target = hostname + suffix
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
r = sock.connect_ex((target, 22))
sock.close()
if r == 0:
return target
except Exception:
continue
return None
def _connect_via_psmp(target):
"""Connexion via PSMP CyberArk (auth_interactive avec Vault Password)."""
if not PARAMIKO_OK:
return None
try:
from .secrets_service import get_secret
from ..database import SessionLocal
db = SessionLocal()
psmp_host = get_secret(db, "psmp_host") or "psmp.sanef.fr"
psmp_port = int(get_secret(db, "psmp_port") or "22")
cyber_user = get_secret(db, "psmp_cyberark_user") or "CYBP01336"
target_user = get_secret(db, "psmp_target_user") or "cybsecope"
password = get_secret(db, "ssh_pwd_default_pass") or ""
db.close()
if not password:
return None
username = f"{cyber_user}@{target_user}@{target}"
transport = paramiko.Transport((psmp_host, psmp_port))
transport.start_client(timeout=SSH_TIMEOUT)
transport.auth_interactive(username, lambda t, i, p: [password] * len(p))
if not transport.is_authenticated():
return None
client = paramiko.SSHClient()
client._transport = transport
return client
except Exception:
return None
def _resolve_ssh_method(hostname):
"""Retourne ssh_method configure pour le serveur (ssh_psmp / ssh_key / ssh_password / None)."""
try:
from ..database import SessionLocal
db = SessionLocal()
row = db.execute(text(
"SELECT ssh_method FROM servers WHERE LOWER(hostname)=LOWER(:h)"
), {"h": hostname.split(".")[0]}).fetchone()
db.close()
return row.ssh_method if row else None
except Exception:
return None
def _connect(target, hostname=None):
if not PARAMIKO_OK:
return None
import os
# Routage PSMP si ssh_method='ssh_psmp' pour ce serveur
method = _resolve_ssh_method(hostname or target)
if method == "ssh_psmp":
client = _connect_via_psmp(target)
if client:
return client
# fallback SSH direct si PSMP KO
ssh_key, ssh_user = _get_ssh_settings()
# 1. Essai clé SSH depuis settings (contenu PEM ou chemin legacy)
key_sources = []
if ssh_key and "BEGIN" in ssh_key and "PRIVATE KEY" in ssh_key:
from io import StringIO
key_sources = [("content", ssh_key)]
elif ssh_key and os.path.exists(ssh_key):
key_sources = [("file", ssh_key)]
for src_type, src in key_sources:
for loader_file, loader_str in [
(paramiko.Ed25519Key.from_private_key_file, paramiko.Ed25519Key.from_private_key),
(paramiko.RSAKey.from_private_key_file, paramiko.RSAKey.from_private_key),
(paramiko.ECDSAKey.from_private_key_file, paramiko.ECDSAKey.from_private_key),
]:
try:
from io import StringIO
if src_type == "file":
key = loader_file(src)
else:
key = loader_str(StringIO(src))
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(target, port=22, username=ssh_user, pkey=key,
timeout=SSH_TIMEOUT, look_for_keys=False, allow_agent=False)
return client
except Exception:
continue
# 2. Fallback mot de passe depuis les settings
try:
from .secrets_service import get_secret
from ..database import SessionLocal
db = SessionLocal()
pwd_user = get_secret(db, "ssh_pwd_default_user") or ssh_user
pwd_pass = get_secret(db, "ssh_pwd_default_pass") or ""
db.close()
if pwd_pass:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(target, port=22, username=pwd_user, password=pwd_pass,
timeout=SSH_TIMEOUT, look_for_keys=False, allow_agent=False)
return client
except Exception:
pass
return None
def _run(client, cmd):
try:
# Test root vs sudo
_, stdout, _ = client.exec_command("id -u", timeout=5)
uid = stdout.read().decode().strip()
if uid == "0":
full = cmd
else:
escaped = cmd.replace("'", "'\"'\"'")
full = f"sudo bash -c '{escaped}'"
_, stdout, stderr = client.exec_command(full, timeout=15)
out = stdout.read().decode("utf-8", errors="replace").strip()
err = stderr.read().decode("utf-8", errors="replace").strip()
# Fallback sans sudo si sudoers refuse (detection robuste case/accent insensible)
err_low = err.lower()
sudo_refused = any(kw in err_low for kw in [
"pas autoris", "non autoris", "not allowed to execute",
"is not allowed", "no tty present", "sudo:",
])
if (not out) and err and sudo_refused:
_, stdout, stderr = client.exec_command(cmd, timeout=15)
out = stdout.read().decode("utf-8", errors="replace").strip()
err2 = stderr.read().decode("utf-8", errors="replace").strip()
if out:
err = ""
else:
err = err2 or err
result = out if out else err
lines = [l for l in result.splitlines() if not any(b in l for b in BANNER_FILTERS) and l.strip()]
return "\n".join(lines).strip()
except Exception as e:
return f"ERROR: {e}"
def audit_single_server(hostname):
"""Audite un serveur et retourne un dict de resultats"""
result = {
"hostname": hostname,
"audit_date": datetime.now().strftime("%Y-%m-%d %H:%M"),
"status": "PENDING",
}
target = _resolve(hostname)
if not target:
result["status"] = "CONNECTION_FAILED"
result["connection_method"] = f"DNS: aucun suffixe résolu ({hostname})"
result["resolved_fqdn"] = None
return result
result["resolved_fqdn"] = target
client = _connect(target, hostname)
if not client:
result["status"] = "CONNECTION_FAILED"
result["connection_method"] = f"SSH: connexion refusée ({target})"
return result
result["status"] = "OK"
ssh_key, ssh_user = _get_ssh_settings()
result["connection_method"] = f"ssh_key ({ssh_user}@{target})"
for key, cmd in AUDIT_CMDS.items():
result[key] = _run(client, cmd)
try:
client.close()
except Exception:
pass
# Post-traitement
agents = result.get("agents", "")
result["qualys_active"] = "qualys" in agents and "active" in agents
result["sentinelone_active"] = "sentinelone" in agents and "active" in agents
result["disk_alert"] = False
for line in (result.get("disk_space") or "").split("\n"):
parts = line.split()
pcts = [p for p in parts if "%" in p]
if pcts:
try:
pct = int(pcts[0].replace("%", ""))
if pct >= 90:
result["disk_alert"] = True
except ValueError:
pass
return result
def audit_servers_list(hostnames):
"""Audite une liste de serveurs"""
results = []
for hn in hostnames:
r = audit_single_server(hn.strip())
results.append(r)
return results
# ═══════════════════════════════════════════════
# Background audit job manager
# ═══════════════════════════════════════════════
import threading
import uuid
import time as _time
_audit_jobs = {}
def start_audit_job(hostnames, parallel=3):
"""Lance un audit en arriere-plan avec pool de threads borne. Retourne le job_id."""
from concurrent.futures import ThreadPoolExecutor
job_id = str(uuid.uuid4())[:8]
job = {
"id": job_id,
"started_at": _time.time(),
"total": len(hostnames),
"done": 0,
"servers": {},
"results": [],
"finished": False,
"parallel": parallel,
}
for hn in hostnames:
job["servers"][hn] = {"hostname": hn, "stage": "pending", "detail": "En attente", "status": None}
_audit_jobs[job_id] = job
def _run():
with ThreadPoolExecutor(max_workers=max(1, int(parallel))) as pool:
for hn in hostnames:
pool.submit(_audit_one, job, hn.strip())
job["finished"] = True
job["finished_at"] = _time.time()
threading.Thread(target=_run, daemon=True).start()
return job_id
def _audit_one(job, hostname):
job["servers"][hostname]["stage"] = "resolving"
job["servers"][hostname]["detail"] = "Résolution DNS"
target = _resolve(hostname)
if not target:
job["servers"][hostname]["stage"] = "failed"
job["servers"][hostname]["detail"] = "DNS: aucun suffixe résolu"
job["servers"][hostname]["status"] = "CONNECTION_FAILED"
result = {"hostname": hostname, "status": "CONNECTION_FAILED",
"connection_method": f"DNS: aucun suffixe résolu ({hostname})", "resolved_fqdn": None}
job["results"].append(result)
job["done"] += 1
return
job["servers"][hostname]["stage"] = "connecting"
job["servers"][hostname]["detail"] = f"Connexion SSH → {target}"
client = _connect(target, hostname)
if not client:
job["servers"][hostname]["stage"] = "failed"
job["servers"][hostname]["detail"] = f"SSH refusé ({target})"
job["servers"][hostname]["status"] = "CONNECTION_FAILED"
result = {"hostname": hostname, "status": "CONNECTION_FAILED",
"connection_method": f"SSH: connexion refusée ({target})", "resolved_fqdn": target}
job["results"].append(result)
job["done"] += 1
return
job["servers"][hostname]["stage"] = "auditing"
job["servers"][hostname]["detail"] = "Collecte des données"
result = {"hostname": hostname, "status": "OK", "resolved_fqdn": target,
"audit_date": datetime.now().strftime("%Y-%m-%d %H:%M")}
ssh_key, ssh_user = _get_ssh_settings()
result["connection_method"] = f"ssh_key ({ssh_user}@{target})"
for key, cmd in AUDIT_CMDS.items():
result[key] = _run(client, cmd)
try:
client.close()
except Exception:
pass
# Post-traitement
agents = result.get("agents", "")
result["qualys_active"] = "qualys" in agents and "active" in agents
result["sentinelone_active"] = "sentinelone" in agents and "active" in agents
result["disk_alert"] = False
for line in (result.get("disk_space") or "").split("\n"):
parts = line.split()
pcts = [p for p in parts if "%" in p]
if pcts:
try:
pct = int(pcts[0].replace("%", ""))
if pct >= 90:
result["disk_alert"] = True
except ValueError:
pass
job["servers"][hostname]["stage"] = "success"
job["servers"][hostname]["detail"] = result.get("os_release", "OK")
job["servers"][hostname]["status"] = "OK"
job["results"].append(result)
job["done"] += 1
def get_audit_job(job_id):
return _audit_jobs.get(job_id)
def list_audit_jobs():
now = _time.time()
return {jid: j for jid, j in _audit_jobs.items() if now - j["started_at"] < 3600}
def save_audit_to_db(db, results):
"""Sauvegarde/met a jour les resultats d'audit en base"""
updated = 0
inserted = 0
for r in results:
hostname = r.get("hostname", "")
if not hostname:
continue
# Trouver server_id
srv = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname) = LOWER(:h)"),
{"h": hostname.split(".")[0]}).fetchone()
server_id = srv.id if srv else None
audit_date = datetime.now()
agents = r.get("agents", "")
# Upsert
existing = db.execute(text(
"SELECT id FROM server_audit WHERE server_id = :sid AND server_id IS NOT NULL"
), {"sid": server_id}).fetchone() if server_id else None
if existing:
db.execute(text("""
UPDATE server_audit SET
status = :st, connection_method = :cm, resolved_fqdn = :rf,
os_release = :os, kernel = :k, uptime = :up, selinux = :se,
disk_detail = :dd, disk_alert = :da,
apps_installed = :ai, services_running = :sr,
running_not_enabled = :rne, listening_ports = :lp,
db_detected = :db, cluster_detected = :cl, containers = :co,
agents = :ag, qualys_active = :qa, sentinelone_active = :s1,
failed_services = :fs, audit_date = :ad
WHERE id = :id
"""), {
"id": existing.id, "st": r.get("status"), "cm": r.get("connection_method"),
"rf": r.get("resolved_fqdn"), "os": r.get("os_release"), "k": r.get("kernel"),
"up": r.get("uptime"), "se": r.get("selinux"), "dd": r.get("disk_space"),
"da": r.get("disk_alert", False), "ai": r.get("apps_installed"),
"sr": r.get("services_running"), "rne": r.get("running_not_enabled"),
"lp": r.get("listening_ports"), "db": r.get("db_detect"),
"cl": r.get("cluster_detect"), "co": r.get("containers"),
"ag": agents, "qa": r.get("qualys_active", False),
"s1": r.get("sentinelone_active", False), "fs": r.get("failed_services"),
"ad": audit_date,
})
updated += 1
else:
db.execute(text("""
INSERT INTO server_audit (server_id, hostname, audit_date, status, connection_method,
resolved_fqdn, os_release, kernel, uptime, selinux, disk_detail, disk_alert,
apps_installed, services_running, running_not_enabled, listening_ports,
db_detected, cluster_detected, containers, agents, qualys_active,
sentinelone_active, failed_services)
VALUES (:sid, :hn, :ad, :st, :cm, :rf, :os, :k, :up, :se, :dd, :da,
:ai, :sr, :rne, :lp, :db, :cl, :co, :ag, :qa, :s1, :fs)
"""), {
"sid": server_id, "hn": hostname, "ad": audit_date,
"st": r.get("status"), "cm": r.get("connection_method"),
"rf": r.get("resolved_fqdn"), "os": r.get("os_release"), "k": r.get("kernel"),
"up": r.get("uptime"), "se": r.get("selinux"), "dd": r.get("disk_space"),
"da": r.get("disk_alert", False), "ai": r.get("apps_installed"),
"sr": r.get("services_running"), "rne": r.get("running_not_enabled"),
"lp": r.get("listening_ports"), "db": r.get("db_detect"),
"cl": r.get("cluster_detect"), "co": r.get("containers"),
"ag": agents, "qa": r.get("qualys_active", False),
"s1": r.get("sentinelone_active", False), "fs": r.get("failed_services"),
})
inserted += 1
# Mettre a jour la table servers avec les infos de l'audit
if server_id and r.get("status") == "OK":
resolved = r.get("resolved_fqdn", "")
# Resoudre l'IP depuis le FQDN
ip_addr = None
if resolved:
try:
ip_addr = socket.gethostbyname(resolved)
except Exception:
pass
from .itop_service import _normalize_os_for_itop
updates = {}
if r.get("os_release"):
updates["os_version"] = _normalize_os_for_itop(r["os_release"].strip())
if ip_addr:
updates["fqdn"] = resolved
if updates:
sets = ", ".join(f"{k} = :{k}" for k in updates)
updates["sid"] = server_id
db.execute(text(f"UPDATE servers SET {sets}, updated_at = NOW() WHERE id = :sid"), updates)
# Mettre a jour/inserer dans server_ips
if ip_addr:
existing_ip = db.execute(text(
"SELECT id FROM server_ips WHERE server_id = :sid AND ip_address = :ip"
), {"sid": server_id, "ip": ip_addr}).fetchone()
if not existing_ip:
db.execute(text(
"INSERT INTO server_ips (server_id, ip_address, ip_type, is_ssh, description) VALUES (:sid, :ip, 'primary', true, 'audit')"
), {"sid": server_id, "ip": ip_addr})
db.commit()
return updated, inserted