"""Service audit temps reel — lance des checks SSH et retourne les resultats""" import socket import json import re from datetime import datetime from sqlalchemy import text try: import paramiko PARAMIKO_OK = True except ImportError: PARAMIKO_OK = False SSH_KEY_DEFAULT = "/opt/patchcenter/keys/id_ed25519" SSH_USER_DEFAULT = "root" SSH_TIMEOUT = 12 DNS_SUFFIXES_DEFAULT = ["", ".mpcz.fr", ".sanef.groupe", ".sanef-rec.fr", ".sanef.fr"] def _get_dns_suffixes(): try: from .secrets_service import get_secret from ..database import SessionLocal db = SessionLocal() val = get_secret(db, "ssh_dns_suffixes") db.close() if val: return [x.strip() for x in val.split(",") if x.strip() or x == ""] except Exception: pass return DNS_SUFFIXES_DEFAULT def _get_ssh_settings(): """Lit les settings SSH depuis app_secrets dans la DB. Retourne (key_material, user). key_material peut etre un chemin (legacy) ou le contenu PEM (nouveau).""" try: from .secrets_service import get_secret from ..database import SessionLocal db = SessionLocal() # Nouveau: contenu PEM direct key_material = get_secret(db, "ssh_key_private_key") if not key_material: key_material = get_secret(db, "ssh_key_file") or SSH_KEY_DEFAULT user = get_secret(db, "ssh_key_default_user") or get_secret(db, "ssh_user") or SSH_USER_DEFAULT db.close() return key_material, user except Exception: return SSH_KEY_DEFAULT, SSH_USER_DEFAULT # Commandes d'audit (simplifiees pour le temps reel) AUDIT_CMDS = { "os_release": "cat /etc/redhat-release 2>/dev/null || grep '^PRETTY_NAME=' /etc/os-release 2>/dev/null | cut -d'\"' -f2", "kernel": "uname -r", "uptime": "uptime -p 2>/dev/null || uptime", "selinux": "getenforce 2>/dev/null || echo N/A", "disk_space": "df -h --output=target,size,avail,pcent 2>/dev/null | grep -vE '^(tmpfs|devtmpfs|Filesystem)' | sort", "apps_installed": "rpm -qa --qf '%{NAME} %{VERSION}\\n' 2>/dev/null | grep -iE 'tomcat|java|jdk|nginx|httpd|haproxy|docker|podman|postgresql|postgres|mysql|mariadb|mongodb|oracle|redis|elasticsearch|splunk|centreon|qualys' | sort -u", "services_running": "systemctl list-units --type=service --state=running --no-pager --no-legend 2>/dev/null | grep -vE '(auditd|chronyd|crond|dbus|firewalld|getty|irqbalance|kdump|lvm2|NetworkManager|polkit|postfix|rsyslog|sshd|sssd|systemd|tuned|user@)' | awk '{print $1}' | sed 's/.service//' | sort", "running_not_enabled": "comm -23 <(systemctl list-units --type=service --state=running --no-pager --no-legend 2>/dev/null | grep -vE '(auditd|chronyd|crond|dbus|firewalld|getty|irqbalance|kdump|lvm2|NetworkManager|polkit|postfix|rsyslog|sshd|sssd|systemd|tuned|user@)' | awk '{print $1}' | sed 's/.service//' | sort) <(systemctl list-unit-files --type=service --state=enabled --no-pager --no-legend 2>/dev/null | awk '{print $1}' | sed 's/.service//' | sort) 2>/dev/null || echo none", "listening_ports": "ss -tlnp 2>/dev/null | grep LISTEN | grep -vE ':22 |:111 |:323 ' | awk '{print $4, $6}' | sort", "db_detect": "for svc in postgresql mariadbd mysqld mongod redis-server; do state=$(systemctl is-active $svc 2>/dev/null); [ \"$state\" = \"active\" ] && echo \"$svc:active\"; done; pgrep -x ora_pmon >/dev/null 2>&1 && echo 'oracle:active' || true", "cluster_detect": "(which pcs 2>/dev/null && pcs status 2>/dev/null | head -3) || (test -f /etc/corosync/corosync.conf && echo 'corosync:present') || echo 'no_cluster'", "containers": "if which podman >/dev/null 2>&1; then USERS=$(ps aux 2>/dev/null | grep -E 'conmon|podman' | grep -v grep | awk '{print $1}' | sort -u); for U in $USERS; do echo \"=== podman@$U ===\"; su - $U -c 'podman ps -a --format \"table {{.Names}} {{.Status}}\"' 2>/dev/null; done; fi; if which docker >/dev/null 2>&1; then docker ps -a --format 'table {{.Names}} {{.Status}}' 2>/dev/null; fi", "agents": "for svc in qualys-cloud-agent sentinelone zabbix-agent; do state=$(systemctl is-active $svc 2>/dev/null); [ \"$state\" = \"active\" ] && echo \"$svc:$state\"; done", "failed_services": "systemctl list-units --type=service --state=failed --no-pager --no-legend 2>/dev/null | awk '{print $2}' | head -10 || echo none", "satellite": "subscription-manager identity 2>/dev/null | grep -i 'org\\|server' || echo 'not_registered'", } BANNER_FILTERS = [ "GROUPE SANEF", "propriété du Groupe", "accèderait", "emprisonnement", "Article 323", "code pénal", "Authorized uses only", "CyberArk", "This session", "session is being", ] def _ordered_suffixes(hostname): """Ordre des suffixes selon la 2e lettre du hostname (convention SANEF). Recette : v[rtdv]* → .sanef-rec.fr en priorité. Prod : v[pls]* / sp / lp → .sanef.groupe puis .sanef.fr en priorité. Inconnu : ordre par défaut.""" all_suffixes = _get_dns_suffixes() second = hostname[1].lower() if len(hostname) > 1 else "" # Recette / Test / Dev / Qualif (vr/vt/vd/vv) if second in ("r", "t", "d", "v"): priority = [".sanef-rec.fr", ".sanef.groupe", ".sanef.fr"] # Prod ou infra (vp/sp/lp/i) elif second in ("p", "i", "l", "s"): priority = [".sanef.groupe", ".sanef.fr", ".sanef-rec.fr"] else: priority = [".sanef.groupe", ".sanef-rec.fr", ".sanef.fr"] ordered = [] for suf in priority: if suf in all_suffixes and suf not in ordered: ordered.append(suf) for suf in all_suffixes: if suf not in ordered: ordered.append(suf) return ordered def _fqdn_is_consistent(hostname, fqdn): """Valide qu'un FQDN BDD est utilisable. Règle : le FQDN doit commencer par le hostname suivi d'un point (sinon c'est probablement une donnée cassée, ex: 'unknown.example.com' pour un host 'vpwhatever'). On NE VÉRIFIE PLUS la convention SANEF des préfixes (vr* → .sanef-rec.fr) parce qu'il y a des exceptions légitimes (ex: vrexpbtex1.sanef.groupe). Si l'opérateur a explicitement renseigné un FQDN, on lui fait confiance. La convention reste utilisée pour l'INFÉRENCE quand le FQDN BDD est vide (cf _ordered_suffixes).""" if not fqdn or not hostname: return False return fqdn.lower().startswith(hostname.lower() + ".") def _candidate_targets(hostname): """Retourne la liste des FQDN candidats à essayer SSH pour ce hostname, dans l'ordre de priorité (logique SANEF) : 1. **Convention d'abord** : suffixe le plus probable selon le préfixe (ex: vr*/lr*/sr* → .sanef-rec.fr ; vp*/lp*/sp* → .sanef.groupe). 2. **FQDN BDD** : si servers.fqdn est renseigné (et commence par hostname), on l'ajoute en 2ᵉ position. Cas d'usage : serveurs exception qui ne suivent pas la convention (ex: vrexpbtex1 sur .sanef.groupe). 3. **Autres suffixes** du référentiel DNS, dans l'ordre conventionnel. Le caller doit essayer SSH sur chaque candidat dans l'ordre, et retourner le 1er qui aboutit. Permettre le test SSH (et pas juste TCP port 22) évite les faux positifs (ex: hostname homonyme qui répond port 22 sur la mauvaise zone DNS).""" out = [] seen = set() suffixes = _ordered_suffixes(hostname) # 1. Convention en premier (1er suffixe selon le préfixe) if suffixes: first = hostname + suffixes[0] out.append(first); seen.add(first.lower()) # 2. FQDN BDD si renseigné et plausible (commence par hostname.) try: from ..database import SessionLocal db = SessionLocal() row = db.execute(text( "SELECT fqdn FROM servers WHERE LOWER(hostname)=LOWER(:h) " "AND fqdn IS NOT NULL AND fqdn != ''" ), {"h": hostname}).fetchone() db.close() if row and row.fqdn and _fqdn_is_consistent(hostname, row.fqdn): f = row.fqdn.lower() if f not in seen: out.append(row.fqdn); seen.add(f) except Exception: pass # 3. Autres suffixes du référentiel for suffix in suffixes[1:]: target = hostname + suffix if target.lower() not in seen: out.append(target); seen.add(target.lower()) return out def _resolve(hostname): """Résolution best-effort : retourne le 1er candidat dont le port 22 répond TCP. Pour un check SSH complet (auth incluse), passer plutôt par _candidate_targets + boucle _connect côté caller.""" for target in _candidate_targets(hostname): try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(2) r = sock.connect_ex((target, 22)) sock.close() if r == 0: return target except Exception: continue return None def _connect_via_psmp(target, errors=None): """Connexion via PSMP CyberArk (auth_interactive avec Vault Password). Si `errors` (list) est fourni, les exceptions y sont append en clair.""" if not PARAMIKO_OK: if errors is not None: errors.append("paramiko non disponible") return None try: from .secrets_service import get_secret from ..database import SessionLocal db = SessionLocal() psmp_host = get_secret(db, "psmp_host") or "psmp.sanef.fr" psmp_port = int(get_secret(db, "psmp_port") or "22") cyber_user = get_secret(db, "psmp_cyberark_user") or "CYBP01336" target_user = get_secret(db, "psmp_target_user") or "cybsecope" password = get_secret(db, "ssh_pwd_default_pass") or "" db.close() if not password: if errors is not None: errors.append("PSMP: ssh_pwd_default_pass non configuré") return None username = f"{cyber_user}@{target_user}@{target}" transport = paramiko.Transport((psmp_host, psmp_port)) transport.start_client(timeout=SSH_TIMEOUT) transport.auth_interactive(username, lambda t, i, p: [password] * len(p)) if not transport.is_authenticated(): if errors is not None: errors.append("PSMP: authentification refusée par le proxy") return None client = paramiko.SSHClient() client._transport = transport return client except Exception as e: if errors is not None: errors.append(f"PSMP: {type(e).__name__}: {e}") return None def _resolve_ssh_method(hostname): """Retourne ssh_method configure pour le serveur (ssh_psmp / ssh_key / ssh_password / None).""" try: from ..database import SessionLocal db = SessionLocal() row = db.execute(text( "SELECT ssh_method FROM servers WHERE LOWER(hostname)=LOWER(:h)" ), {"h": hostname.split(".")[0]}).fetchone() db.close() return row.ssh_method if row else None except Exception: return None def _connect(target, hostname=None, errors=None): """Établit une session SSH. Si `errors` (list) est fourni, les exceptions de chaque tentative (PSMP, clé, password) y sont append en clair pour aider au diagnostic côté UI.""" if not PARAMIKO_OK: if errors is not None: errors.append("paramiko non disponible côté serveur PatchCenter") return None import os method = _resolve_ssh_method(hostname or target) if method == "ssh_psmp": client = _connect_via_psmp(target, errors=errors) if client: return client # fallback SSH direct si PSMP KO ssh_key, ssh_user = _get_ssh_settings() key_sources = [] if ssh_key and "BEGIN" in ssh_key and "PRIVATE KEY" in ssh_key: key_sources = [("content", ssh_key)] elif ssh_key and os.path.exists(ssh_key): key_sources = [("file", ssh_key)] elif ssh_key: if errors is not None: errors.append(f"Clé SSH configurée mais introuvable / format invalide ({ssh_key[:50]}...)") if not key_sources and errors is not None: errors.append("Aucune clé SSH utilisable depuis Settings (ssh_key_private_key vide ou invalide)") for src_type, src in key_sources: for loader_name, loader_file, loader_str in [ ("Ed25519", paramiko.Ed25519Key.from_private_key_file, paramiko.Ed25519Key.from_private_key), ("RSA", paramiko.RSAKey.from_private_key_file, paramiko.RSAKey.from_private_key), ("ECDSA", paramiko.ECDSAKey.from_private_key_file, paramiko.ECDSAKey.from_private_key), ]: try: from io import StringIO if src_type == "file": key = loader_file(src) else: key = loader_str(StringIO(src)) client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(target, port=22, username=ssh_user, pkey=key, timeout=SSH_TIMEOUT, look_for_keys=False, allow_agent=False) return client except Exception as e: if errors is not None: errors.append(f"Clé {loader_name} ({ssh_user}@{target}:22): {type(e).__name__}: {e}") continue # 2. Fallback mot de passe depuis les settings try: from .secrets_service import get_secret from ..database import SessionLocal db = SessionLocal() pwd_user = get_secret(db, "ssh_pwd_default_user") or ssh_user pwd_pass = get_secret(db, "ssh_pwd_default_pass") or "" db.close() if pwd_pass: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(target, port=22, username=pwd_user, password=pwd_pass, timeout=SSH_TIMEOUT, look_for_keys=False, allow_agent=False) return client elif errors is not None: errors.append("Pas de fallback password (ssh_pwd_default_pass non configuré)") except Exception as e: if errors is not None: errors.append(f"Password ({pwd_user if 'pwd_user' in dir() else ssh_user}@{target}:22): {type(e).__name__}: {e}") return None def _run(client, cmd): try: # Test root vs sudo _, stdout, _ = client.exec_command("id -u", timeout=5) uid = stdout.read().decode().strip() if uid == "0": full = cmd else: escaped = cmd.replace("'", "'\"'\"'") full = f"sudo bash -c '{escaped}'" _, stdout, stderr = client.exec_command(full, timeout=15) out = stdout.read().decode("utf-8", errors="replace").strip() err = stderr.read().decode("utf-8", errors="replace").strip() # Fallback sans sudo si sudoers refuse (detection robuste case/accent insensible) SUDO_KW = ["pas autoris", "non autoris", "not allowed to execute", "is not allowed", "no tty present", "sudo:"] err_low = err.lower() sudo_refused = any(kw in err_low for kw in SUDO_KW) if (not out) and err and sudo_refused: _, stdout, stderr = client.exec_command(cmd, timeout=15) out = stdout.read().decode("utf-8", errors="replace").strip() err2 = stderr.read().decode("utf-8", errors="replace").strip() err2_low = err2.lower() still_sudo_err = any(kw in err2_low for kw in SUDO_KW) if still_sudo_err: err = err2 else: # Retry sans sudo a abouti (sortie vide acceptable) err = err2 if err2 else "" if not out and not err: out = "" # explicite : pas de containers / pas de services failed = OK result = out if out else err lines = [l for l in result.splitlines() if not any(b in l for b in BANNER_FILTERS) and l.strip()] return "\n".join(lines).strip() except Exception as e: return f"ERROR: {e}" def audit_single_server(hostname): """Audite un serveur et retourne un dict de resultats""" result = { "hostname": hostname, "audit_date": datetime.now().strftime("%Y-%m-%d %H:%M"), "status": "PENDING", } target = _resolve(hostname) if not target: result["status"] = "CONNECTION_FAILED" result["connection_method"] = f"DNS: aucun suffixe résolu ({hostname})" result["resolved_fqdn"] = None return result result["resolved_fqdn"] = target client = _connect(target, hostname) if not client: result["status"] = "CONNECTION_FAILED" result["connection_method"] = f"SSH: connexion refusée ({target})" return result result["status"] = "OK" ssh_key, ssh_user = _get_ssh_settings() result["connection_method"] = f"ssh_key ({ssh_user}@{target})" for key, cmd in AUDIT_CMDS.items(): result[key] = _run(client, cmd) try: client.close() except Exception: pass # Post-traitement agents = result.get("agents", "") result["qualys_active"] = "qualys" in agents and "active" in agents result["sentinelone_active"] = "sentinelone" in agents and "active" in agents result["disk_alert"] = False for line in (result.get("disk_space") or "").split("\n"): parts = line.split() pcts = [p for p in parts if "%" in p] if pcts: try: pct = int(pcts[0].replace("%", "")) if pct >= 90: result["disk_alert"] = True except ValueError: pass return result def audit_servers_list(hostnames): """Audite une liste de serveurs""" results = [] for hn in hostnames: r = audit_single_server(hn.strip()) results.append(r) return results # ═══════════════════════════════════════════════ # Background audit job manager # ═══════════════════════════════════════════════ import threading import uuid import time as _time _audit_jobs = {} def start_audit_job(hostnames, parallel=3): """Lance un audit en arriere-plan avec pool de threads borne. Retourne le job_id.""" from concurrent.futures import ThreadPoolExecutor job_id = str(uuid.uuid4())[:8] job = { "id": job_id, "started_at": _time.time(), "total": len(hostnames), "done": 0, "servers": {}, "results": [], "finished": False, "parallel": parallel, } for hn in hostnames: job["servers"][hn] = {"hostname": hn, "stage": "pending", "detail": "En attente", "status": None} _audit_jobs[job_id] = job def _run(): with ThreadPoolExecutor(max_workers=max(1, int(parallel))) as pool: for hn in hostnames: pool.submit(_audit_one, job, hn.strip()) job["finished"] = True job["finished_at"] = _time.time() threading.Thread(target=_run, daemon=True).start() return job_id def _audit_one(job, hostname): job["servers"][hostname]["stage"] = "resolving" job["servers"][hostname]["detail"] = "Résolution DNS" target = _resolve(hostname) if not target: job["servers"][hostname]["stage"] = "failed" job["servers"][hostname]["detail"] = "DNS: aucun suffixe résolu" job["servers"][hostname]["status"] = "CONNECTION_FAILED" result = {"hostname": hostname, "status": "CONNECTION_FAILED", "connection_method": f"DNS: aucun suffixe résolu ({hostname})", "resolved_fqdn": None} job["results"].append(result) job["done"] += 1 return job["servers"][hostname]["stage"] = "connecting" job["servers"][hostname]["detail"] = f"Connexion SSH → {target}" client = _connect(target, hostname) if not client: job["servers"][hostname]["stage"] = "failed" job["servers"][hostname]["detail"] = f"SSH refusé ({target})" job["servers"][hostname]["status"] = "CONNECTION_FAILED" result = {"hostname": hostname, "status": "CONNECTION_FAILED", "connection_method": f"SSH: connexion refusée ({target})", "resolved_fqdn": target} job["results"].append(result) job["done"] += 1 return job["servers"][hostname]["stage"] = "auditing" job["servers"][hostname]["detail"] = "Collecte des données" result = {"hostname": hostname, "status": "OK", "resolved_fqdn": target, "audit_date": datetime.now().strftime("%Y-%m-%d %H:%M")} ssh_key, ssh_user = _get_ssh_settings() result["connection_method"] = f"ssh_key ({ssh_user}@{target})" for key, cmd in AUDIT_CMDS.items(): result[key] = _run(client, cmd) try: client.close() except Exception: pass # Post-traitement agents = result.get("agents", "") result["qualys_active"] = "qualys" in agents and "active" in agents result["sentinelone_active"] = "sentinelone" in agents and "active" in agents result["disk_alert"] = False for line in (result.get("disk_space") or "").split("\n"): parts = line.split() pcts = [p for p in parts if "%" in p] if pcts: try: pct = int(pcts[0].replace("%", "")) if pct >= 90: result["disk_alert"] = True except ValueError: pass job["servers"][hostname]["stage"] = "success" job["servers"][hostname]["detail"] = result.get("os_release", "OK") job["servers"][hostname]["status"] = "OK" job["results"].append(result) job["done"] += 1 def get_audit_job(job_id): return _audit_jobs.get(job_id) def list_audit_jobs(): now = _time.time() return {jid: j for jid, j in _audit_jobs.items() if now - j["started_at"] < 3600} def save_audit_to_db(db, results): """Sauvegarde/met a jour les resultats d'audit en base""" updated = 0 inserted = 0 for r in results: hostname = r.get("hostname", "") if not hostname: continue # Trouver server_id srv = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname) = LOWER(:h)"), {"h": hostname.split(".")[0]}).fetchone() server_id = srv.id if srv else None audit_date = datetime.now() agents = r.get("agents", "") # Upsert existing = db.execute(text( "SELECT id FROM server_audit WHERE server_id = :sid AND server_id IS NOT NULL" ), {"sid": server_id}).fetchone() if server_id else None if existing: db.execute(text(""" UPDATE server_audit SET status = :st, connection_method = :cm, resolved_fqdn = :rf, os_release = :os, kernel = :k, uptime = :up, selinux = :se, disk_detail = :dd, disk_alert = :da, apps_installed = :ai, services_running = :sr, running_not_enabled = :rne, listening_ports = :lp, db_detected = :db, cluster_detected = :cl, containers = :co, agents = :ag, qualys_active = :qa, sentinelone_active = :s1, failed_services = :fs, audit_date = :ad WHERE id = :id """), { "id": existing.id, "st": r.get("status"), "cm": r.get("connection_method"), "rf": r.get("resolved_fqdn"), "os": r.get("os_release"), "k": r.get("kernel"), "up": r.get("uptime"), "se": r.get("selinux"), "dd": r.get("disk_space"), "da": r.get("disk_alert", False), "ai": r.get("apps_installed"), "sr": r.get("services_running"), "rne": r.get("running_not_enabled"), "lp": r.get("listening_ports"), "db": r.get("db_detect"), "cl": r.get("cluster_detect"), "co": r.get("containers"), "ag": agents, "qa": r.get("qualys_active", False), "s1": r.get("sentinelone_active", False), "fs": r.get("failed_services"), "ad": audit_date, }) updated += 1 else: db.execute(text(""" INSERT INTO server_audit (server_id, hostname, audit_date, status, connection_method, resolved_fqdn, os_release, kernel, uptime, selinux, disk_detail, disk_alert, apps_installed, services_running, running_not_enabled, listening_ports, db_detected, cluster_detected, containers, agents, qualys_active, sentinelone_active, failed_services) VALUES (:sid, :hn, :ad, :st, :cm, :rf, :os, :k, :up, :se, :dd, :da, :ai, :sr, :rne, :lp, :db, :cl, :co, :ag, :qa, :s1, :fs) """), { "sid": server_id, "hn": hostname, "ad": audit_date, "st": r.get("status"), "cm": r.get("connection_method"), "rf": r.get("resolved_fqdn"), "os": r.get("os_release"), "k": r.get("kernel"), "up": r.get("uptime"), "se": r.get("selinux"), "dd": r.get("disk_space"), "da": r.get("disk_alert", False), "ai": r.get("apps_installed"), "sr": r.get("services_running"), "rne": r.get("running_not_enabled"), "lp": r.get("listening_ports"), "db": r.get("db_detect"), "cl": r.get("cluster_detect"), "co": r.get("containers"), "ag": agents, "qa": r.get("qualys_active", False), "s1": r.get("sentinelone_active", False), "fs": r.get("failed_services"), }) inserted += 1 # Mettre a jour la table servers avec les infos de l'audit if server_id and r.get("status") == "OK": resolved = r.get("resolved_fqdn", "") # Resoudre l'IP depuis le FQDN ip_addr = None if resolved: try: ip_addr = socket.gethostbyname(resolved) except Exception: pass from .itop_service import _normalize_os_for_itop updates = {} if r.get("os_release"): updates["os_version"] = _normalize_os_for_itop(r["os_release"].strip()) if ip_addr: updates["fqdn"] = resolved if updates: sets = ", ".join(f"{k} = :{k}" for k in updates) updates["sid"] = server_id db.execute(text(f"UPDATE servers SET {sets}, updated_at = NOW() WHERE id = :sid"), updates) # Mettre a jour/inserer dans server_ips if ip_addr: existing_ip = db.execute(text( "SELECT id FROM server_ips WHERE server_id = :sid AND ip_address = :ip" ), {"sid": server_id, "ip": ip_addr}).fetchone() if not existing_ip: db.execute(text( "INSERT INTO server_ips (server_id, ip_address, ip_type, is_ssh, description) VALUES (:sid, :ip, 'primary', true, 'audit')" ), {"sid": server_id, "ip": ip_addr}) db.commit() return updated, inserted # =========================================================================== # AUDIT CIBLE QUALYS AGENT — pour bouton "Check" sur page Agents inactifs # Utilise la meme mecanique de connexion que audit_single_server (DB-driven) # =========================================================================== QUALYS_AGENT_CMDS = { "os_release": "cat /etc/redhat-release 2>/dev/null || (grep '^PRETTY_NAME=' /etc/os-release 2>/dev/null | cut -d'\"' -f2) || uname -sr", "agent_status": ( "if command -v systemctl >/dev/null 2>&1; then " " systemctl status qualys-cloud-agent --no-pager 2>&1 | head -25; " "elif [ -x /etc/init.d/qualys-cloud-agent ]; then " " /etc/init.d/qualys-cloud-agent status 2>&1 | head -25; " "elif command -v service >/dev/null 2>&1; then " " service qualys-cloud-agent status 2>&1 | head -25; " "else " " echo '--- ps (init system inconnu) ---'; " " ps -ef 2>/dev/null | grep -i qualys-cloud-agent | grep -v grep | head -5 || echo 'aucun process Qualys'; " "fi" ), "agent_version": ( "(rpm -q qualys-cloud-agent 2>/dev/null) || " "(dpkg -l qualys-cloud-agent 2>/dev/null | awk '/^ii/{print $2,$3}') || " "(/usr/local/qualys/cloud-agent/bin/qualys-cloud-agent.sh -v 2>&1) || " "echo 'version introuvable'" ), "agent_log": ( "for f in /var/log/qualys/qualys-cloud-agent.log " "/var/log/qualys-cloud-agent/qualys-cloud-agent.log " "/usr/local/qualys/cloud-agent/log/qualys-cloud-agent.log " "/var/log/qualysagent/qualysagent.log; do " " if [ -e \"$f\" ]; then " " out=$(tail -50 \"$f\" 2>/dev/null || sudo -n tail -50 \"$f\" 2>/dev/null); " " if [ -n \"$out\" ]; then echo \"=== $f ===\"; echo \"$out\"; exit 0; fi; " " echo \"=== $f (existe mais non lisible — sudo refuse) ===\"; " " fi; " "done; " "echo 'log Qualys introuvable. Chemins testes: /var/log/qualys/*, /var/log/qualys-cloud-agent/*, /usr/local/qualys/cloud-agent/log/*, /var/log/qualysagent/*'" ), "disk_space": ( "echo '=== Disque global ==='; " "df -h 2>/dev/null | grep -vE '^(tmpfs|devtmpfs|Filesystem|overlay|/dev/loop)' | head -15; " "echo; echo '=== /var/log (partition agent) ==='; " "df -h /var/log 2>/dev/null | tail -1; " "echo; echo '=== Top 5 dossiers /var/log ==='; " "(du -sh /var/log/* 2>/dev/null | sort -rh | head -5) || (sudo -n du -sh /var/log/* 2>/dev/null | sort -rh | head -5) || echo '(non lisible)'" ), "qualys_connectivity": ( # Test minimal : si HTTP code reçu -> connectivité OK. Sinon dérouler le diag. "URL=https://qagpublic.qg1.apps.qualys.eu/Qlys/CloudAgent/status; " "if command -v curl >/dev/null 2>&1; then " " CODE=$(curl --connect-timeout 5 -sS -o /dev/null -w '%{http_code}' \"$URL\" 2>/dev/null); " " if [ -n \"$CODE\" ] && [ \"$CODE\" != \"000\" ]; then " " echo \"✓ Connectivité OK (HTTP $CODE depuis $URL)\"; " " else " " echo '✗ Connectivité KO — diag détaillé :'; echo; " " echo '--- DNS ---'; " " (getent hosts qagpublic.qg1.apps.qualys.eu 2>/dev/null || nslookup qagpublic.qg1.apps.qualys.eu 2>/dev/null | tail -3) || echo 'DNS KO'; " " echo '--- curl verbeux ---'; " " curl --connect-timeout 5 -v -sS -o /dev/null \"$URL\" 2>&1 | grep -E 'Trying|Connected|connect|Could not|refused|timed out|verify|SSL' | head -10; " " fi; " "else echo '(curl absent — impossible de tester)'; fi" ), "lvm_info": ( "echo '=== Volume Groups (espace libre dans le VG) ==='; " "(sudo -n vgs --noheadings --units g -o vg_name,vg_size,vg_free 2>/dev/null || " " vgs --noheadings --units g -o vg_name,vg_size,vg_free 2>/dev/null) | head -10 || echo '(pas LVM ou commande non autorisee)'; " "echo; echo '=== Logical Volumes (filtre log/var) ==='; " "(sudo -n lvs --noheadings --units g -o lv_name,vg_name,lv_size,lv_attr 2>/dev/null || " " lvs --noheadings --units g -o lv_name,vg_name,lv_size,lv_attr 2>/dev/null) | grep -iE 'log|var' || echo '(pas de LV log/var ou non lisible)'; " "echo; echo '=== FS type sur /var/log ==='; " "(stat -f -c '%T' /var/log 2>/dev/null) || (df -T /var/log 2>/dev/null | awk 'NR==2{print $2}') || echo '(stat KO)'" ), "logrotate_config": ( "FOUND=0; " "for f in /etc/logrotate.d/qualys-cloud-agent /etc/logrotate.d/qualys " "/etc/logrotate.d/qualysagent; do " " if [ -e \"$f\" ]; then echo \"=== $f ===\"; (cat \"$f\" 2>/dev/null || sudo -n cat \"$f\" 2>/dev/null); FOUND=1; fi; " "done; " "if [ $FOUND -eq 0 ]; then echo '(pas de config logrotate dediee Qualys — l agent gere ses logs en interne)'; fi; " "echo; echo '=== /etc/qualys/cloud-agent/qagent-log.conf ==='; " "(cat /etc/qualys/cloud-agent/qagent-log.conf 2>/dev/null || sudo -n cat /etc/qualys/cloud-agent/qagent-log.conf 2>/dev/null) || echo '(non trouve / non lisible)'" ), "system_log": ( "if command -v journalctl >/dev/null 2>&1; then " " out=$(journalctl -u qualys-cloud-agent --no-pager -n 50 2>/dev/null || sudo -n journalctl -u qualys-cloud-agent --no-pager -n 50 2>/dev/null); " " if [ -n \"$out\" ]; then echo \"$out\"; else echo '(journalctl: aucune entree ou non autorise)'; fi; " "elif [ -e /var/log/messages ]; then " " echo '--- /var/log/messages (filtre qualys, 50 derniers) ---'; " " out=$(grep -i qualys /var/log/messages 2>/dev/null | tail -50 || sudo -n grep -i qualys /var/log/messages 2>/dev/null | tail -50); " " if [ -n \"$out\" ]; then echo \"$out\"; else echo '(aucune entree qualys ou sudo refuse)'; fi; " "elif [ -e /var/log/syslog ]; then " " echo '--- /var/log/syslog (filtre qualys, 50 derniers) ---'; " " out=$(grep -i qualys /var/log/syslog 2>/dev/null | tail -50 || sudo -n grep -i qualys /var/log/syslog 2>/dev/null | tail -50); " " if [ -n \"$out\" ]; then echo \"$out\"; else echo '(aucune entree qualys ou sudo refuse)'; fi; " "else " " echo 'logs systeme indisponibles (journalctl absent, messages/syslog non trouves)'; " "fi" ), } import threading as _threading _qualys_audit_cache = {} # hostname -> {status, result, started_at, finished_at, error} _qualys_audit_lock = _threading.Lock() def _analyze_qualys_audit(r): """Analyse les sorties d'audit pour suggerer des resolutions concretes. Retourne liste de {severity: critical|high|medium|low, title, fix}.""" import re suggestions = [] s_status = (r.get("agent_status") or "").lower() s_log = (r.get("agent_log") or "").lower() s_sys = (r.get("system_log") or "").lower() s_disk = (r.get("disk_space") or "") s_conn = (r.get("qualys_connectivity") or "").lower() s_ver = (r.get("agent_version") or "") s_os_lower = (r.get("os_release") or "").lower() # Early exit RHEL 5 uniquement (vraiment EOL / agent moderne incompatible) if "release 5" in s_os_lower: return [{ "severity": "info", "title": "OS en fin de vie (RHEL 5) — agent legacy", "fix": "Constat : RHEL 5 EOL. L'agent installé est forcément ancien (Qualys 7.x non supporté). " "Les éventuels warnings TLS, version agent, etc. sont des conséquences attendues " "et ne sont pas actionnables tant que le serveur est en place. " "Proposition : à intégrer au plan de migration / décommissionnement de la VM." }] s_lvm = (r.get("lvm_info") or "") s_lrt = (r.get("logrotate_config") or "").lower() # Disque saturé / agent ne peut écrire disk_full = " 100%" in s_disk or "no space left" in (s_log + s_sys) if disk_full: suggestions.append({ "severity": "critical", "title": "Partition /var/log apparaît saturée", "fix": "Constat : une partition est à 100% et/ou l'agent rapporte 'no space left'. " "Hypothèse : pourrait empêcher l'agent d'écrire ses logs.\n" "Proposition : ouvrir un ticket support pour vérifier l'état du disque " "et envisager soit un cleanup des logs archivés, soit une extension du FS " "(snapshot vCenter recommandé avant toute action)." }) # Si LVM avec free dans VG -> note diagnostic m_vg = re.search(r"(\S+)\s+([\d.]+)g\s+([\d.]+)g", s_lvm.lower()) if m_vg and float(m_vg.group(3)) > 0.5: vg_name = m_vg.group(1) free_gb = float(m_vg.group(3)) suggestions.append({ "severity": "high", "title": f"LVM : extension FS potentiellement possible (VG {vg_name} ~{free_gb}G libres)", "fix": f"Constat : le VG {vg_name} semble disposer de ~{free_gb}G non alloués. " f"Proposition : ticket support pour validation et extension du LV /var/log si pertinent " f"(snapshot vCenter à prévoir au préalable)." }) if "cannot write file" in s_sys or "logger initialization failed" in s_sys: suggestions.append({ "severity": "critical", "title": "L'agent semble ne pas pouvoir écrire son log", "fix": "Constat : présence de 'Cannot write file' / 'Logger initialization failed' dans les logs. " "Hypothèses possibles : disque saturé, permissions cassées sur /var/log/qualys. " "Proposition : ticket support pour analyse." }) # Crash loop m = re.search(r"restart counter is at (\d+)", s_sys) if m and int(m.group(1)) > 50: suggestions.append({ "severity": "high", "title": f"Possible boucle de redémarrage (~{m.group(1)} restarts observés)", "fix": "Constat : le compteur de restart systemd est élevé. " "Hypothèse : crash loop persistant. " "Proposition : ticket support pour analyse de la cause racine et arrêt temporaire du service " "le temps de l'investigation (afin de limiter le bruit dans les logs)." }) # Connectivité KO if any(k in s_conn for k in ["connexion directe echec", "connection refused", "timed out", "could not resolve", "no route", "unreachable", "dns ko"]): suggestions.append({ "severity": "high", "title": "Connectivité Qualys cloud apparaît KO (flux direct)", "fix": "Constat : le test direct vers qagpublic.qg1.apps.qualys.eu:443 ne répond pas. " "Hypothèse : flux 443 sortant peut-être bloqué côté firewall périmétrique. " "Proposition : ticket réseau pour vérifier les règles de flux applicables à ce serveur, " "en comparaison avec un serveur où l'agent fonctionne." }) if "certificate verify failed" in s_conn or ("ssl" in s_conn and "verify" in s_conn): suggestions.append({ "severity": "high", "title": "Erreur TLS/SSL observée", "fix": "Constat : la vérification du certificat semble échouer. " "Hypothèses : interception SSL par un équipement intermédiaire, ou bundle CA système à mettre à jour. " "Proposition : ticket support sécu/réseau pour analyse." }) # Service désactivé / arrêté if "masked" in s_status: suggestions.append({ "severity": "medium", "title": "Service apparaît masked", "fix": "Constat : le service systemd Qualys semble masqué. " "Proposition : ticket support pour identifier la raison (manuel ? configuration ?) " "et décider de l'action." }) elif "disabled" in s_status: suggestions.append({ "severity": "medium", "title": "Service apparaît disabled au boot", "fix": "Constat : le service ne démarrera pas automatiquement au prochain reboot. " "Hypothèses : volontaire (serveur en décom) ou oubli post-intervention. " "Proposition : ticket support pour clarifier." }) elif any(k in s_status for k in ["inactive (dead)", "stopped", "not running"]) \ and "active" not in s_status: suggestions.append({ "severity": "medium", "title": "Service apparaît arrêté", "fix": "Constat : l'agent ne semble plus en cours d'exécution. " "Proposition : ticket support pour identifier la cause de l'arrêt (voir logs ci-dessous)." }) # Agent obsolète if re.match(r"^qualys-cloud-agent-([0-5]\.|6\.[01]\.)", s_ver): suggestions.append({ "severity": "low", "title": f"Version agent ancienne détectée ({s_ver.strip()})", "fix": "Constat : version antérieure à 7.x, potentiellement plus supportée. " "Proposition : à inclure dans le plan de MAJ des agents Qualys." }) # Logrotate Qualys mal configuré if s_lrt and "qualys" in s_lrt: if "compress" in s_lrt and ("nocompress" in s_lrt or not re.search(r"^\s*compress\s*$", s_lrt, re.MULTILINE)): suggestions.append({ "severity": "medium", "title": "Logrotate Qualys : compression semble désactivée", "fix": "Constat : la directive 'compress' n'apparaît pas active. " "Hypothèse : les logs archivés peuvent occuper 5-10× plus de place sans compression. " "Proposition : ticket support pour vérifier la configuration logrotate." }) if "rotate" not in s_lrt: suggestions.append({ "severity": "low", "title": "Logrotate Qualys : directive rotate absente", "fix": "Constat : pas de politique de rétention détectée. " "Hypothèse : les logs pourraient s'accumuler indéfiniment. " "Proposition : ticket support pour ajout d'une politique adaptée." }) # Core dump + package absent = installation incomplète / corrompue if ("core-dump" in s_sys or "core dumped" in s_sys or "abrt" in s_sys.lower()) and \ ("introuvable" in s_ver.lower() or s_ver.lower().startswith("version introuvable")): suggestions.append({ "severity": "critical", "title": "Possible installation Qualys cassée (core dump + package non détecté)", "fix": "Constat : service systemd actif mais aucun package qualys-cloud-agent dans la base RPM, " "et core dumps répétés. " "Hypothèse : désinstallation incomplète ou installation corrompue. " "Proposition : ticket support pour cleanup complet et réinstallation via RPM SANEF " "(/root/QualysCloudAgent.rpm) avec ré-activation." }) elif "core-dump" in s_sys or "core dumped" in s_sys: suggestions.append({ "severity": "high", "title": "Core dumps observés sur l'agent", "fix": "Constat : le binaire semble crasher au démarrage (signal SIGABRT). " "Hypothèses : lib système cassée, incompatibilité version agent vs OS, conflit. " "Proposition : ticket support pour analyse approfondie." }) # OS RHEL 6 : agent peut tourner si CA bundle à jour if "release 6" in s_os_lower: suggestions.append({ "severity": "info", "title": "OS RHEL 6 — agent legacy supporté avec CA bundle à jour", "fix": "Constat : RHEL 6, OS proche/en EOL mais agent Qualys 6.x fonctionnel. " "Hypothèse : si erreurs TLS observées, souvent dues au bundle CA système obsolète " "(certificat racine DigiCert manquant). " "Proposition : ticket support pour mise à jour ca-certificates si TLS KO. " "Sinon, à intégrer au plan de migration à terme." }) return suggestions def start_qualys_audit_async(hostname, force=False): """Lance audit_qualys_agent_only en background. Reuse run pending récent (<2min).""" with _qualys_audit_lock: existing = _qualys_audit_cache.get(hostname) if existing and existing.get("status") == "pending" and not force: age = (datetime.now() - existing["started_at"]).total_seconds() if age < 120: return False _qualys_audit_cache[hostname] = { "status": "pending", "result": None, "started_at": datetime.now(), "finished_at": None, "error": None, } def _runner(): try: res = audit_qualys_agent_only(hostname) with _qualys_audit_lock: state = _qualys_audit_cache.get(hostname, {}) state.update({ "status": "ok", "result": res, "finished_at": datetime.now(), }) _qualys_audit_cache[hostname] = state except Exception as ex: with _qualys_audit_lock: state = _qualys_audit_cache.get(hostname, {}) state.update({ "status": "error", "error": str(ex), "finished_at": datetime.now(), }) _qualys_audit_cache[hostname] = state t = _threading.Thread(target=_runner, daemon=True) t.start() return True def get_qualys_audit_state(hostname): with _qualys_audit_lock: return dict(_qualys_audit_cache.get(hostname, {})) or None def audit_qualys_agent_only(hostname): """Audit cible Qualys Agent uniquement: status service + version + logs. Utilise _resolve + _connect + _run comme audit_single_server. Retourne dict {hostname, status, connection_method, resolved_fqdn, ...cmds}.""" result = { "hostname": hostname, "audit_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "status": "PENDING", "connection_method": None, "resolved_fqdn": None, } for k in QUALYS_AGENT_CMDS: result[k] = None target = _resolve(hostname) if not target: result["status"] = "CONNECTION_FAILED" result["fail_reason"] = "DNS_NOT_RESOLVED" result["fail_detail"] = ( f"Aucun nom DNS résolu pour {hostname} (testé sans suffixe puis avec " f".sanef.groupe / .sanef.fr / .sanef-rec.fr / .mpcz.fr). " "Le hostname est peut-être incorrect, le DNS interne KO, ou le serveur n'a jamais existé / a été décommissionné." ) result["connection_method"] = f"DNS: aucun suffixe resolu ({hostname})" return result result["resolved_fqdn"] = target # Test TCP/22 avant SSH pour identifier la cause précise (timeout / refused / OK) tcp_status = "unknown" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) rc = sock.connect_ex((target, 22)) sock.close() if rc == 0: tcp_status = "open" elif rc in (111, 10061): # ECONNREFUSED tcp_status = "refused" else: tcp_status = f"errno {rc}" except socket.timeout: tcp_status = "timeout" except Exception as ex_tcp: tcp_status = f"err: {ex_tcp}" if tcp_status == "timeout": result["status"] = "CONNECTION_FAILED" result["fail_reason"] = "TCP_TIMEOUT" result["fail_detail"] = ( f"Port 22/TCP injoignable sur {target} (timeout 5s). " "Le serveur est probablement éteint, le flux SSH bloqué côté firewall, " "ou la VM en panne réseau." ) result["connection_method"] = f"DNS OK ({target}) — TCP/22 timeout" return result if tcp_status == "refused": result["status"] = "CONNECTION_FAILED" result["fail_reason"] = "TCP_REFUSED" result["fail_detail"] = ( f"Port 22/TCP rejette la connexion sur {target} (RST/refused). " "Le service sshd est probablement arrêté ou refuse activement. " "Le serveur lui-même répond mais pas SSH." ) result["connection_method"] = f"DNS OK ({target}) — TCP/22 refused" return result if tcp_status not in ("open",): result["status"] = "CONNECTION_FAILED" result["fail_reason"] = "TCP_UNKNOWN" result["fail_detail"] = f"Test TCP/22 inattendu sur {target} : {tcp_status}" result["connection_method"] = f"DNS OK — TCP {tcp_status}" return result client = _connect(target, hostname) if not client: result["status"] = "CONNECTION_FAILED" result["fail_reason"] = "SSH_AUTH_OR_HANDSHAKE_FAILED" method_cfg = _resolve_ssh_method(hostname) or "ssh_key (default)" result["fail_detail"] = ( f"TCP/22 ouvert sur {target} mais la connexion SSH échoue (handshake ou authentification). " f"Méthode SSH configurée pour ce serveur : {method_cfg}. " "Pistes : clé SSH non autorisée pour ce serveur, méthode PSMP/key/password mal configurée " "côté PatchCenter (settings), ou compte SSH désactivé / cassé sur le serveur." ) result["connection_method"] = f"TCP/22 OK ({target}) — SSH auth/handshake KO" return result method = _resolve_ssh_method(hostname) or "ssh_key" result["connection_method"] = f"{method} -> {target}" try: # Toutes les commandes dans 1 seul script bash avec markers — 1 channel SSH unique. # Evite le "Timeout opening channel" sur PSMP qui limite le nombre de channels. combined_parts = [] for key, cmd in QUALYS_AGENT_CMDS.items(): combined_parts.append(f"echo '__SECTION_{key}_START__'") combined_parts.append(cmd) combined_parts.append(f"echo '__SECTION_{key}_END__'") combined = "; ".join(combined_parts) # exec_command direct avec timeout plus long (60s) car script combiné = curl 5s + plusieurs commandes try: _, stdout_chk, _ = client.exec_command("id -u", timeout=5) uid = stdout_chk.read().decode().strip() full_cmd = combined if uid == "0" else "sudo bash -c '" + combined.replace("'", "'\"'\"'") + "'" _, stdout, stderr = client.exec_command(full_cmd, timeout=60) big_out = stdout.read().decode("utf-8", errors="replace") err = stderr.read().decode("utf-8", errors="replace") if not big_out.strip() and err.strip(): # Fallback retry sans sudo si sudoers refuse _, stdout2, _ = client.exec_command(combined, timeout=60) big_out = stdout2.read().decode("utf-8", errors="replace") except Exception as ex_inner: big_out = f"ERROR: {ex_inner}" # Parser la sortie en cherchant les markers for key in QUALYS_AGENT_CMDS: start_marker = f"__SECTION_{key}_START__" end_marker = f"__SECTION_{key}_END__" try: section = big_out.split(start_marker, 1)[1].split(end_marker, 1)[0].strip() except Exception: section = "(parsing failed)" result[key] = section or "(empty)" result["status"] = "OK" except Exception as e: result["status"] = "ERROR" result["error_msg"] = str(e) finally: try: client.close() except Exception: pass # Analyser les sorties pour suggerer des resolutions if result["status"] == "OK": result["suggestions"] = _analyze_qualys_audit(result) # Resume binaire pour la checklist en tete de page s_conn = (result.get("qualys_connectivity") or "") s_disk = (result.get("disk_space") or "") s_status = (result.get("agent_status") or "").lower() s_ver = (result.get("agent_version") or "") s_sys = (result.get("system_log") or "") result["check_connectivity"] = "Connectivité OK" in s_conn result["check_disk"] = " 100%" not in s_disk # Multi-format : systemd ("active (running)"), SysV anglais ("is running" / "(pid"), # SysV français RHEL 5/6 ("en cours d'exécution") is_running = ( "active (running)" in s_status or "(pid " in s_status or "is running" in s_status or "en cours d'exécution" in s_status or "en cours d'execution" in s_status # sans accent ) is_stopped = any(k in s_status for k in [ "inactive (dead)", "stopped", "not running", "n'est pas en cours", "is not running" ]) result["check_service"] = is_running and not is_stopped result["check_installed"] = bool(re.search(r"qualys-cloud-agent[-\s]\d", s_ver)) # Ligne saturee si disque KO result["disk_saturated_line"] = "" if not result["check_disk"]: for ln in s_disk.split("\n"): if " 100%" in ln: result["disk_saturated_line"] = ln.strip() break # Garder seulement les 10 dernieres lignes du log systeme sys_lines = [l for l in s_sys.split("\n") if l.strip()] result["system_log"] = "\n".join(sys_lines[-10:]) else: result["suggestions"] = [] result["check_connectivity"] = False result["check_disk"] = False result["check_service"] = False result["check_installed"] = False result["disk_saturated_line"] = "" return result