patchcenter/app/services/realtime_audit_service.py

949 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Service audit temps reel — lance des checks SSH et retourne les resultats"""
import socket
import json
import re
from datetime import datetime
from sqlalchemy import text
try:
import paramiko
PARAMIKO_OK = True
except ImportError:
PARAMIKO_OK = False
SSH_KEY_DEFAULT = "/opt/patchcenter/keys/id_ed25519"
SSH_USER_DEFAULT = "root"
SSH_TIMEOUT = 12
DNS_SUFFIXES_DEFAULT = ["", ".mpcz.fr", ".sanef.groupe", ".sanef-rec.fr", ".sanef.fr"]
def _get_dns_suffixes():
try:
from .secrets_service import get_secret
from ..database import SessionLocal
db = SessionLocal()
val = get_secret(db, "ssh_dns_suffixes")
db.close()
if val:
return [x.strip() for x in val.split(",") if x.strip() or x == ""]
except Exception:
pass
return DNS_SUFFIXES_DEFAULT
def _get_ssh_settings():
"""Lit les settings SSH depuis app_secrets dans la DB.
Retourne (key_material, user). key_material peut etre un chemin (legacy)
ou le contenu PEM (nouveau)."""
try:
from .secrets_service import get_secret
from ..database import SessionLocal
db = SessionLocal()
# Nouveau: contenu PEM direct
key_material = get_secret(db, "ssh_key_private_key")
if not key_material:
key_material = get_secret(db, "ssh_key_file") or SSH_KEY_DEFAULT
user = get_secret(db, "ssh_key_default_user") or get_secret(db, "ssh_user") or SSH_USER_DEFAULT
db.close()
return key_material, user
except Exception:
return SSH_KEY_DEFAULT, SSH_USER_DEFAULT
# Commandes d'audit (simplifiees pour le temps reel)
AUDIT_CMDS = {
"os_release": "cat /etc/redhat-release 2>/dev/null || grep '^PRETTY_NAME=' /etc/os-release 2>/dev/null | cut -d'\"' -f2",
"kernel": "uname -r",
"uptime": "uptime -p 2>/dev/null || uptime",
"selinux": "getenforce 2>/dev/null || echo N/A",
"disk_space": "df -h --output=target,size,avail,pcent 2>/dev/null | grep -vE '^(tmpfs|devtmpfs|Filesystem)' | sort",
"apps_installed": "rpm -qa --qf '%{NAME} %{VERSION}\\n' 2>/dev/null | grep -iE 'tomcat|java|jdk|nginx|httpd|haproxy|docker|podman|postgresql|postgres|mysql|mariadb|mongodb|oracle|redis|elasticsearch|splunk|centreon|qualys' | sort -u",
"services_running": "systemctl list-units --type=service --state=running --no-pager --no-legend 2>/dev/null | grep -vE '(auditd|chronyd|crond|dbus|firewalld|getty|irqbalance|kdump|lvm2|NetworkManager|polkit|postfix|rsyslog|sshd|sssd|systemd|tuned|user@)' | awk '{print $1}' | sed 's/.service//' | sort",
"running_not_enabled": "comm -23 <(systemctl list-units --type=service --state=running --no-pager --no-legend 2>/dev/null | grep -vE '(auditd|chronyd|crond|dbus|firewalld|getty|irqbalance|kdump|lvm2|NetworkManager|polkit|postfix|rsyslog|sshd|sssd|systemd|tuned|user@)' | awk '{print $1}' | sed 's/.service//' | sort) <(systemctl list-unit-files --type=service --state=enabled --no-pager --no-legend 2>/dev/null | awk '{print $1}' | sed 's/.service//' | sort) 2>/dev/null || echo none",
"listening_ports": "ss -tlnp 2>/dev/null | grep LISTEN | grep -vE ':22 |:111 |:323 ' | awk '{print $4, $6}' | sort",
"db_detect": "for svc in postgresql mariadbd mysqld mongod redis-server; do state=$(systemctl is-active $svc 2>/dev/null); [ \"$state\" = \"active\" ] && echo \"$svc:active\"; done; pgrep -x ora_pmon >/dev/null 2>&1 && echo 'oracle:active' || true",
"cluster_detect": "(which pcs 2>/dev/null && pcs status 2>/dev/null | head -3) || (test -f /etc/corosync/corosync.conf && echo 'corosync:present') || echo 'no_cluster'",
"containers": "if which podman >/dev/null 2>&1; then USERS=$(ps aux 2>/dev/null | grep -E 'conmon|podman' | grep -v grep | awk '{print $1}' | sort -u); for U in $USERS; do echo \"=== podman@$U ===\"; su - $U -c 'podman ps -a --format \"table {{.Names}} {{.Status}}\"' 2>/dev/null; done; fi; if which docker >/dev/null 2>&1; then docker ps -a --format 'table {{.Names}} {{.Status}}' 2>/dev/null; fi",
"agents": "for svc in qualys-cloud-agent sentinelone zabbix-agent; do state=$(systemctl is-active $svc 2>/dev/null); [ \"$state\" = \"active\" ] && echo \"$svc:$state\"; done",
"failed_services": "systemctl list-units --type=service --state=failed --no-pager --no-legend 2>/dev/null | awk '{print $2}' | head -10 || echo none",
"satellite": "subscription-manager identity 2>/dev/null | grep -i 'org\\|server' || echo 'not_registered'",
}
BANNER_FILTERS = [
"GROUPE SANEF", "propriété du Groupe", "accèderait", "emprisonnement",
"Article 323", "code pénal", "Authorized uses only", "CyberArk",
"This session", "session is being",
]
def _ordered_suffixes(hostname):
"""Ordre des suffixes selon la 2e lettre du hostname (convention SANEF).
r=recette, p=prod, i=infra. Les autres suffixes sont tentés en fallback."""
all_suffixes = _get_dns_suffixes()
second = hostname[1].lower() if len(hostname) > 1 else ""
if second == "r":
priority = [".sanef-rec.fr", ".sanef.groupe", ".sanef.fr"]
elif second in ("p", "i"):
priority = [".sanef.groupe", ".sanef-rec.fr", ".sanef.fr"]
else:
priority = [".sanef.groupe", ".sanef-rec.fr", ".sanef.fr"]
ordered = []
for suf in priority:
if suf in all_suffixes and suf not in ordered:
ordered.append(suf)
for suf in all_suffixes:
if suf not in ordered:
ordered.append(suf)
return ordered
def _resolve(hostname):
# 1. FQDN stocke en base - retour direct sans check port (rapide)
try:
from ..database import SessionLocal
db = SessionLocal()
row = db.execute(text(
"SELECT fqdn FROM servers WHERE LOWER(hostname)=LOWER(:h) "
"AND fqdn IS NOT NULL AND fqdn != ''"
), {"h": hostname}).fetchone()
db.close()
if row and row.fqdn:
return row.fqdn
except Exception:
pass
# 2. Fallback : boucle suffixes DNS (si FQDN manquant en base)
for suffix in _ordered_suffixes(hostname):
target = hostname + suffix
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
r = sock.connect_ex((target, 22))
sock.close()
if r == 0:
return target
except Exception:
continue
return None
def _connect_via_psmp(target):
"""Connexion via PSMP CyberArk (auth_interactive avec Vault Password)."""
if not PARAMIKO_OK:
return None
try:
from .secrets_service import get_secret
from ..database import SessionLocal
db = SessionLocal()
psmp_host = get_secret(db, "psmp_host") or "psmp.sanef.fr"
psmp_port = int(get_secret(db, "psmp_port") or "22")
cyber_user = get_secret(db, "psmp_cyberark_user") or "CYBP01336"
target_user = get_secret(db, "psmp_target_user") or "cybsecope"
password = get_secret(db, "ssh_pwd_default_pass") or ""
db.close()
if not password:
return None
username = f"{cyber_user}@{target_user}@{target}"
transport = paramiko.Transport((psmp_host, psmp_port))
transport.start_client(timeout=SSH_TIMEOUT)
transport.auth_interactive(username, lambda t, i, p: [password] * len(p))
if not transport.is_authenticated():
return None
client = paramiko.SSHClient()
client._transport = transport
return client
except Exception:
return None
def _resolve_ssh_method(hostname):
"""Retourne ssh_method configure pour le serveur (ssh_psmp / ssh_key / ssh_password / None)."""
try:
from ..database import SessionLocal
db = SessionLocal()
row = db.execute(text(
"SELECT ssh_method FROM servers WHERE LOWER(hostname)=LOWER(:h)"
), {"h": hostname.split(".")[0]}).fetchone()
db.close()
return row.ssh_method if row else None
except Exception:
return None
def _connect(target, hostname=None):
if not PARAMIKO_OK:
return None
import os
# Routage PSMP si ssh_method='ssh_psmp' pour ce serveur
method = _resolve_ssh_method(hostname or target)
if method == "ssh_psmp":
client = _connect_via_psmp(target)
if client:
return client
# fallback SSH direct si PSMP KO
ssh_key, ssh_user = _get_ssh_settings()
# 1. Essai clé SSH depuis settings (contenu PEM ou chemin legacy)
key_sources = []
if ssh_key and "BEGIN" in ssh_key and "PRIVATE KEY" in ssh_key:
from io import StringIO
key_sources = [("content", ssh_key)]
elif ssh_key and os.path.exists(ssh_key):
key_sources = [("file", ssh_key)]
for src_type, src in key_sources:
for loader_file, loader_str in [
(paramiko.Ed25519Key.from_private_key_file, paramiko.Ed25519Key.from_private_key),
(paramiko.RSAKey.from_private_key_file, paramiko.RSAKey.from_private_key),
(paramiko.ECDSAKey.from_private_key_file, paramiko.ECDSAKey.from_private_key),
]:
try:
from io import StringIO
if src_type == "file":
key = loader_file(src)
else:
key = loader_str(StringIO(src))
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(target, port=22, username=ssh_user, pkey=key,
timeout=SSH_TIMEOUT, look_for_keys=False, allow_agent=False)
return client
except Exception:
continue
# 2. Fallback mot de passe depuis les settings
try:
from .secrets_service import get_secret
from ..database import SessionLocal
db = SessionLocal()
pwd_user = get_secret(db, "ssh_pwd_default_user") or ssh_user
pwd_pass = get_secret(db, "ssh_pwd_default_pass") or ""
db.close()
if pwd_pass:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(target, port=22, username=pwd_user, password=pwd_pass,
timeout=SSH_TIMEOUT, look_for_keys=False, allow_agent=False)
return client
except Exception:
pass
return None
def _run(client, cmd):
try:
# Test root vs sudo
_, stdout, _ = client.exec_command("id -u", timeout=5)
uid = stdout.read().decode().strip()
if uid == "0":
full = cmd
else:
escaped = cmd.replace("'", "'\"'\"'")
full = f"sudo bash -c '{escaped}'"
_, stdout, stderr = client.exec_command(full, timeout=15)
out = stdout.read().decode("utf-8", errors="replace").strip()
err = stderr.read().decode("utf-8", errors="replace").strip()
# Fallback sans sudo si sudoers refuse (detection robuste case/accent insensible)
SUDO_KW = ["pas autoris", "non autoris", "not allowed to execute",
"is not allowed", "no tty present", "sudo:"]
err_low = err.lower()
sudo_refused = any(kw in err_low for kw in SUDO_KW)
if (not out) and err and sudo_refused:
_, stdout, stderr = client.exec_command(cmd, timeout=15)
out = stdout.read().decode("utf-8", errors="replace").strip()
err2 = stderr.read().decode("utf-8", errors="replace").strip()
err2_low = err2.lower()
still_sudo_err = any(kw in err2_low for kw in SUDO_KW)
if still_sudo_err:
err = err2
else:
# Retry sans sudo a abouti (sortie vide acceptable)
err = err2 if err2 else ""
if not out and not err:
out = "" # explicite : pas de containers / pas de services failed = OK
result = out if out else err
lines = [l for l in result.splitlines() if not any(b in l for b in BANNER_FILTERS) and l.strip()]
return "\n".join(lines).strip()
except Exception as e:
return f"ERROR: {e}"
def audit_single_server(hostname):
"""Audite un serveur et retourne un dict de resultats"""
result = {
"hostname": hostname,
"audit_date": datetime.now().strftime("%Y-%m-%d %H:%M"),
"status": "PENDING",
}
target = _resolve(hostname)
if not target:
result["status"] = "CONNECTION_FAILED"
result["connection_method"] = f"DNS: aucun suffixe résolu ({hostname})"
result["resolved_fqdn"] = None
return result
result["resolved_fqdn"] = target
client = _connect(target, hostname)
if not client:
result["status"] = "CONNECTION_FAILED"
result["connection_method"] = f"SSH: connexion refusée ({target})"
return result
result["status"] = "OK"
ssh_key, ssh_user = _get_ssh_settings()
result["connection_method"] = f"ssh_key ({ssh_user}@{target})"
for key, cmd in AUDIT_CMDS.items():
result[key] = _run(client, cmd)
try:
client.close()
except Exception:
pass
# Post-traitement
agents = result.get("agents", "")
result["qualys_active"] = "qualys" in agents and "active" in agents
result["sentinelone_active"] = "sentinelone" in agents and "active" in agents
result["disk_alert"] = False
for line in (result.get("disk_space") or "").split("\n"):
parts = line.split()
pcts = [p for p in parts if "%" in p]
if pcts:
try:
pct = int(pcts[0].replace("%", ""))
if pct >= 90:
result["disk_alert"] = True
except ValueError:
pass
return result
def audit_servers_list(hostnames):
"""Audite une liste de serveurs"""
results = []
for hn in hostnames:
r = audit_single_server(hn.strip())
results.append(r)
return results
# ═══════════════════════════════════════════════
# Background audit job manager
# ═══════════════════════════════════════════════
import threading
import uuid
import time as _time
_audit_jobs = {}
def start_audit_job(hostnames, parallel=3):
"""Lance un audit en arriere-plan avec pool de threads borne. Retourne le job_id."""
from concurrent.futures import ThreadPoolExecutor
job_id = str(uuid.uuid4())[:8]
job = {
"id": job_id,
"started_at": _time.time(),
"total": len(hostnames),
"done": 0,
"servers": {},
"results": [],
"finished": False,
"parallel": parallel,
}
for hn in hostnames:
job["servers"][hn] = {"hostname": hn, "stage": "pending", "detail": "En attente", "status": None}
_audit_jobs[job_id] = job
def _run():
with ThreadPoolExecutor(max_workers=max(1, int(parallel))) as pool:
for hn in hostnames:
pool.submit(_audit_one, job, hn.strip())
job["finished"] = True
job["finished_at"] = _time.time()
threading.Thread(target=_run, daemon=True).start()
return job_id
def _audit_one(job, hostname):
job["servers"][hostname]["stage"] = "resolving"
job["servers"][hostname]["detail"] = "Résolution DNS"
target = _resolve(hostname)
if not target:
job["servers"][hostname]["stage"] = "failed"
job["servers"][hostname]["detail"] = "DNS: aucun suffixe résolu"
job["servers"][hostname]["status"] = "CONNECTION_FAILED"
result = {"hostname": hostname, "status": "CONNECTION_FAILED",
"connection_method": f"DNS: aucun suffixe résolu ({hostname})", "resolved_fqdn": None}
job["results"].append(result)
job["done"] += 1
return
job["servers"][hostname]["stage"] = "connecting"
job["servers"][hostname]["detail"] = f"Connexion SSH → {target}"
client = _connect(target, hostname)
if not client:
job["servers"][hostname]["stage"] = "failed"
job["servers"][hostname]["detail"] = f"SSH refusé ({target})"
job["servers"][hostname]["status"] = "CONNECTION_FAILED"
result = {"hostname": hostname, "status": "CONNECTION_FAILED",
"connection_method": f"SSH: connexion refusée ({target})", "resolved_fqdn": target}
job["results"].append(result)
job["done"] += 1
return
job["servers"][hostname]["stage"] = "auditing"
job["servers"][hostname]["detail"] = "Collecte des données"
result = {"hostname": hostname, "status": "OK", "resolved_fqdn": target,
"audit_date": datetime.now().strftime("%Y-%m-%d %H:%M")}
ssh_key, ssh_user = _get_ssh_settings()
result["connection_method"] = f"ssh_key ({ssh_user}@{target})"
for key, cmd in AUDIT_CMDS.items():
result[key] = _run(client, cmd)
try:
client.close()
except Exception:
pass
# Post-traitement
agents = result.get("agents", "")
result["qualys_active"] = "qualys" in agents and "active" in agents
result["sentinelone_active"] = "sentinelone" in agents and "active" in agents
result["disk_alert"] = False
for line in (result.get("disk_space") or "").split("\n"):
parts = line.split()
pcts = [p for p in parts if "%" in p]
if pcts:
try:
pct = int(pcts[0].replace("%", ""))
if pct >= 90:
result["disk_alert"] = True
except ValueError:
pass
job["servers"][hostname]["stage"] = "success"
job["servers"][hostname]["detail"] = result.get("os_release", "OK")
job["servers"][hostname]["status"] = "OK"
job["results"].append(result)
job["done"] += 1
def get_audit_job(job_id):
return _audit_jobs.get(job_id)
def list_audit_jobs():
now = _time.time()
return {jid: j for jid, j in _audit_jobs.items() if now - j["started_at"] < 3600}
def save_audit_to_db(db, results):
"""Sauvegarde/met a jour les resultats d'audit en base"""
updated = 0
inserted = 0
for r in results:
hostname = r.get("hostname", "")
if not hostname:
continue
# Trouver server_id
srv = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname) = LOWER(:h)"),
{"h": hostname.split(".")[0]}).fetchone()
server_id = srv.id if srv else None
audit_date = datetime.now()
agents = r.get("agents", "")
# Upsert
existing = db.execute(text(
"SELECT id FROM server_audit WHERE server_id = :sid AND server_id IS NOT NULL"
), {"sid": server_id}).fetchone() if server_id else None
if existing:
db.execute(text("""
UPDATE server_audit SET
status = :st, connection_method = :cm, resolved_fqdn = :rf,
os_release = :os, kernel = :k, uptime = :up, selinux = :se,
disk_detail = :dd, disk_alert = :da,
apps_installed = :ai, services_running = :sr,
running_not_enabled = :rne, listening_ports = :lp,
db_detected = :db, cluster_detected = :cl, containers = :co,
agents = :ag, qualys_active = :qa, sentinelone_active = :s1,
failed_services = :fs, audit_date = :ad
WHERE id = :id
"""), {
"id": existing.id, "st": r.get("status"), "cm": r.get("connection_method"),
"rf": r.get("resolved_fqdn"), "os": r.get("os_release"), "k": r.get("kernel"),
"up": r.get("uptime"), "se": r.get("selinux"), "dd": r.get("disk_space"),
"da": r.get("disk_alert", False), "ai": r.get("apps_installed"),
"sr": r.get("services_running"), "rne": r.get("running_not_enabled"),
"lp": r.get("listening_ports"), "db": r.get("db_detect"),
"cl": r.get("cluster_detect"), "co": r.get("containers"),
"ag": agents, "qa": r.get("qualys_active", False),
"s1": r.get("sentinelone_active", False), "fs": r.get("failed_services"),
"ad": audit_date,
})
updated += 1
else:
db.execute(text("""
INSERT INTO server_audit (server_id, hostname, audit_date, status, connection_method,
resolved_fqdn, os_release, kernel, uptime, selinux, disk_detail, disk_alert,
apps_installed, services_running, running_not_enabled, listening_ports,
db_detected, cluster_detected, containers, agents, qualys_active,
sentinelone_active, failed_services)
VALUES (:sid, :hn, :ad, :st, :cm, :rf, :os, :k, :up, :se, :dd, :da,
:ai, :sr, :rne, :lp, :db, :cl, :co, :ag, :qa, :s1, :fs)
"""), {
"sid": server_id, "hn": hostname, "ad": audit_date,
"st": r.get("status"), "cm": r.get("connection_method"),
"rf": r.get("resolved_fqdn"), "os": r.get("os_release"), "k": r.get("kernel"),
"up": r.get("uptime"), "se": r.get("selinux"), "dd": r.get("disk_space"),
"da": r.get("disk_alert", False), "ai": r.get("apps_installed"),
"sr": r.get("services_running"), "rne": r.get("running_not_enabled"),
"lp": r.get("listening_ports"), "db": r.get("db_detect"),
"cl": r.get("cluster_detect"), "co": r.get("containers"),
"ag": agents, "qa": r.get("qualys_active", False),
"s1": r.get("sentinelone_active", False), "fs": r.get("failed_services"),
})
inserted += 1
# Mettre a jour la table servers avec les infos de l'audit
if server_id and r.get("status") == "OK":
resolved = r.get("resolved_fqdn", "")
# Resoudre l'IP depuis le FQDN
ip_addr = None
if resolved:
try:
ip_addr = socket.gethostbyname(resolved)
except Exception:
pass
from .itop_service import _normalize_os_for_itop
updates = {}
if r.get("os_release"):
updates["os_version"] = _normalize_os_for_itop(r["os_release"].strip())
if ip_addr:
updates["fqdn"] = resolved
if updates:
sets = ", ".join(f"{k} = :{k}" for k in updates)
updates["sid"] = server_id
db.execute(text(f"UPDATE servers SET {sets}, updated_at = NOW() WHERE id = :sid"), updates)
# Mettre a jour/inserer dans server_ips
if ip_addr:
existing_ip = db.execute(text(
"SELECT id FROM server_ips WHERE server_id = :sid AND ip_address = :ip"
), {"sid": server_id, "ip": ip_addr}).fetchone()
if not existing_ip:
db.execute(text(
"INSERT INTO server_ips (server_id, ip_address, ip_type, is_ssh, description) VALUES (:sid, :ip, 'primary', true, 'audit')"
), {"sid": server_id, "ip": ip_addr})
db.commit()
return updated, inserted
# ===========================================================================
# AUDIT CIBLE QUALYS AGENT — pour bouton "Check" sur page Agents inactifs
# Utilise la meme mecanique de connexion que audit_single_server (DB-driven)
# ===========================================================================
QUALYS_AGENT_CMDS = {
"os_release": "cat /etc/redhat-release 2>/dev/null || (grep '^PRETTY_NAME=' /etc/os-release 2>/dev/null | cut -d'\"' -f2) || uname -sr",
"agent_status": (
"if command -v systemctl >/dev/null 2>&1; then "
" systemctl status qualys-cloud-agent --no-pager 2>&1 | head -25; "
"elif [ -x /etc/init.d/qualys-cloud-agent ]; then "
" /etc/init.d/qualys-cloud-agent status 2>&1 | head -25; "
"elif command -v service >/dev/null 2>&1; then "
" service qualys-cloud-agent status 2>&1 | head -25; "
"else "
" echo '--- ps (init system inconnu) ---'; "
" ps -ef 2>/dev/null | grep -i qualys-cloud-agent | grep -v grep | head -5 || echo 'aucun process Qualys'; "
"fi"
),
"agent_version": (
"(rpm -q qualys-cloud-agent 2>/dev/null) || "
"(dpkg -l qualys-cloud-agent 2>/dev/null | awk '/^ii/{print $2,$3}') || "
"(/usr/local/qualys/cloud-agent/bin/qualys-cloud-agent.sh -v 2>&1) || "
"echo 'version introuvable'"
),
"agent_log": (
"for f in /var/log/qualys/qualys-cloud-agent.log "
"/var/log/qualys-cloud-agent/qualys-cloud-agent.log "
"/usr/local/qualys/cloud-agent/log/qualys-cloud-agent.log "
"/var/log/qualysagent/qualysagent.log; do "
" if [ -e \"$f\" ]; then "
" out=$(tail -50 \"$f\" 2>/dev/null || sudo -n tail -50 \"$f\" 2>/dev/null); "
" if [ -n \"$out\" ]; then echo \"=== $f ===\"; echo \"$out\"; exit 0; fi; "
" echo \"=== $f (existe mais non lisible — sudo refuse) ===\"; "
" fi; "
"done; "
"echo 'log Qualys introuvable. Chemins testes: /var/log/qualys/*, /var/log/qualys-cloud-agent/*, /usr/local/qualys/cloud-agent/log/*, /var/log/qualysagent/*'"
),
"disk_space": (
"echo '=== Disque global ==='; "
"df -h 2>/dev/null | grep -vE '^(tmpfs|devtmpfs|Filesystem|overlay|/dev/loop)' | head -15; "
"echo; echo '=== /var/log (partition agent) ==='; "
"df -h /var/log 2>/dev/null | tail -1; "
"echo; echo '=== Top 5 dossiers /var/log ==='; "
"(du -sh /var/log/* 2>/dev/null | sort -rh | head -5) || (sudo -n du -sh /var/log/* 2>/dev/null | sort -rh | head -5) || echo '(non lisible)'"
),
"qualys_connectivity": (
# Test minimal : si HTTP code reçu -> connectivité OK. Sinon dérouler le diag.
"URL=https://qagpublic.qg1.apps.qualys.eu/Qlys/CloudAgent/status; "
"if command -v curl >/dev/null 2>&1; then "
" CODE=$(curl --connect-timeout 5 -sS -o /dev/null -w '%{http_code}' \"$URL\" 2>/dev/null); "
" if [ -n \"$CODE\" ] && [ \"$CODE\" != \"000\" ]; then "
" echo \"✓ Connectivité OK (HTTP $CODE depuis $URL)\"; "
" else "
" echo '✗ Connectivité KO — diag détaillé :'; echo; "
" echo '--- DNS ---'; "
" (getent hosts qagpublic.qg1.apps.qualys.eu 2>/dev/null || nslookup qagpublic.qg1.apps.qualys.eu 2>/dev/null | tail -3) || echo 'DNS KO'; "
" echo '--- curl verbeux ---'; "
" curl --connect-timeout 5 -v -sS -o /dev/null \"$URL\" 2>&1 | grep -E 'Trying|Connected|connect|Could not|refused|timed out|verify|SSL' | head -10; "
" fi; "
"else echo '(curl absent — impossible de tester)'; fi"
),
"lvm_info": (
"echo '=== Volume Groups (espace libre dans le VG) ==='; "
"(sudo -n vgs --noheadings --units g -o vg_name,vg_size,vg_free 2>/dev/null || "
" vgs --noheadings --units g -o vg_name,vg_size,vg_free 2>/dev/null) | head -10 || echo '(pas LVM ou commande non autorisee)'; "
"echo; echo '=== Logical Volumes (filtre log/var) ==='; "
"(sudo -n lvs --noheadings --units g -o lv_name,vg_name,lv_size,lv_attr 2>/dev/null || "
" lvs --noheadings --units g -o lv_name,vg_name,lv_size,lv_attr 2>/dev/null) | grep -iE 'log|var' || echo '(pas de LV log/var ou non lisible)'; "
"echo; echo '=== FS type sur /var/log ==='; "
"(stat -f -c '%T' /var/log 2>/dev/null) || (df -T /var/log 2>/dev/null | awk 'NR==2{print $2}') || echo '(stat KO)'"
),
"logrotate_config": (
"FOUND=0; "
"for f in /etc/logrotate.d/qualys-cloud-agent /etc/logrotate.d/qualys "
"/etc/logrotate.d/qualysagent; do "
" if [ -e \"$f\" ]; then echo \"=== $f ===\"; (cat \"$f\" 2>/dev/null || sudo -n cat \"$f\" 2>/dev/null); FOUND=1; fi; "
"done; "
"if [ $FOUND -eq 0 ]; then echo '(pas de config logrotate dediee Qualys — l agent gere ses logs en interne)'; fi; "
"echo; echo '=== /etc/qualys/cloud-agent/qagent-log.conf ==='; "
"(cat /etc/qualys/cloud-agent/qagent-log.conf 2>/dev/null || sudo -n cat /etc/qualys/cloud-agent/qagent-log.conf 2>/dev/null) || echo '(non trouve / non lisible)'"
),
"system_log": (
"if command -v journalctl >/dev/null 2>&1; then "
" out=$(journalctl -u qualys-cloud-agent --no-pager -n 50 2>/dev/null || sudo -n journalctl -u qualys-cloud-agent --no-pager -n 50 2>/dev/null); "
" if [ -n \"$out\" ]; then echo \"$out\"; else echo '(journalctl: aucune entree ou non autorise)'; fi; "
"elif [ -e /var/log/messages ]; then "
" echo '--- /var/log/messages (filtre qualys, 50 derniers) ---'; "
" out=$(grep -i qualys /var/log/messages 2>/dev/null | tail -50 || sudo -n grep -i qualys /var/log/messages 2>/dev/null | tail -50); "
" if [ -n \"$out\" ]; then echo \"$out\"; else echo '(aucune entree qualys ou sudo refuse)'; fi; "
"elif [ -e /var/log/syslog ]; then "
" echo '--- /var/log/syslog (filtre qualys, 50 derniers) ---'; "
" out=$(grep -i qualys /var/log/syslog 2>/dev/null | tail -50 || sudo -n grep -i qualys /var/log/syslog 2>/dev/null | tail -50); "
" if [ -n \"$out\" ]; then echo \"$out\"; else echo '(aucune entree qualys ou sudo refuse)'; fi; "
"else "
" echo 'logs systeme indisponibles (journalctl absent, messages/syslog non trouves)'; "
"fi"
),
}
import threading as _threading
_qualys_audit_cache = {} # hostname -> {status, result, started_at, finished_at, error}
_qualys_audit_lock = _threading.Lock()
def _analyze_qualys_audit(r):
"""Analyse les sorties d'audit pour suggerer des resolutions concretes.
Retourne liste de {severity: critical|high|medium|low, title, fix}."""
import re
suggestions = []
s_status = (r.get("agent_status") or "").lower()
s_log = (r.get("agent_log") or "").lower()
s_sys = (r.get("system_log") or "").lower()
s_disk = (r.get("disk_space") or "")
s_conn = (r.get("qualys_connectivity") or "").lower()
s_ver = (r.get("agent_version") or "")
s_lvm = (r.get("lvm_info") or "")
s_lrt = (r.get("logrotate_config") or "").lower()
# Disque saturé / agent ne peut écrire
disk_full = " 100%" in s_disk or "no space left" in (s_log + s_sys)
if disk_full:
suggestions.append({
"severity": "critical",
"title": "Partition /var/log saturée",
"fix": "Cause : disque 100% rempli, l'agent ne peut plus écrire ses logs et crashe.\n"
"Pistes possibles : cleanup logs anciens (.log.0/1/2 du crash loop), "
"extension du FS si LVM avec espace libre dans le VG (voir bloc LVM ci-dessous)."
})
# Si LVM avec free dans VG -> note diagnostic
m_vg = re.search(r"(\S+)\s+([\d.]+)g\s+([\d.]+)g", s_lvm.lower())
if m_vg and float(m_vg.group(3)) > 0.5:
vg_name = m_vg.group(1)
free_gb = float(m_vg.group(3))
suggestions.append({
"severity": "high",
"title": f"LVM : extension FS possible (VG {vg_name} a {free_gb}G libres)",
"fix": f"Le VG {vg_name} dispose de {free_gb}G non alloués → extension du LV /var/log faisable. "
f"⚠ Snapshot vCenter de la VM obligatoire avant toute action de redimensionnement."
})
if "cannot write file" in s_sys or "logger initialization failed" in s_sys:
suggestions.append({
"severity": "critical",
"title": "Agent ne peut pas écrire son log",
"fix": "Cause : permissions /var/log/qualys cassées, ou disque saturé. "
"L'agent fail-fast à l'init du logger."
})
# Crash loop
m = re.search(r"restart counter is at (\d+)", s_sys)
if m and int(m.group(1)) > 50:
suggestions.append({
"severity": "high",
"title": f"Crash loop ({m.group(1)} restarts depuis la dernière stabilité)",
"fix": "Le service est en boucle de redémarrage permanente. "
"À stopper temporairement (sudo systemctl stop qualys-cloud-agent) "
"le temps que le problème racine soit traité — sinon il pollue les logs et stresse le système."
})
# Connectivité KO
if any(k in s_conn for k in ["connexion directe echec", "connection refused", "timed out",
"could not resolve", "no route", "unreachable", "dns ko"]):
suggestions.append({
"severity": "high",
"title": "Connectivité Qualys cloud KO (flux direct bloqué)",
"fix": "L'agent doit joindre qagpublic.qg1.apps.qualys.eu:443 en direct (pas via proxy). "
"Le test échoue → flux 443 sortant probablement bloqué côté firewall périmétrique. "
"Comparer avec un serveur où l'agent fonctionne pour confirmer "
"que les mêmes règles de flux sont appliquées."
})
if "certificate verify failed" in s_conn or ("ssl" in s_conn and "verify" in s_conn):
suggestions.append({
"severity": "high",
"title": "Erreur TLS/SSL",
"fix": "Cause possible : interception SSL par un proxy MITM, ou bundle CA système obsolète. "
"À investiguer avec l'équipe réseau/sécu."
})
# Service désactivé / arrêté
if "masked" in s_status:
suggestions.append({
"severity": "medium",
"title": "Service masked",
"fix": "Le service Qualys a été explicitement masqué. À unmask + enable une fois la cause identifiée."
})
elif "disabled" in s_status:
suggestions.append({
"severity": "medium",
"title": "Service disabled au boot",
"fix": "Le service ne démarrera pas au prochain reboot. "
"Soit voulu (serveur en décom), soit oubli post-intervention — à clarifier."
})
elif any(k in s_status for k in ["inactive (dead)", "stopped", "not running"]) \
and "active" not in s_status:
suggestions.append({
"severity": "medium",
"title": "Service arrêté",
"fix": "L'agent n'est plus en cours d'exécution. Voir les logs ci-dessous pour la cause de l'arrêt."
})
# Agent obsolète
if re.match(r"^qualys-cloud-agent-([0-5]\.|6\.[01]\.)", s_ver):
suggestions.append({
"severity": "low",
"title": f"Agent obsolète ({s_ver.strip()})",
"fix": "Version ancienne, plus supportée par Qualys. "
"À planifier en upgrade vers 7.x dans le cadre des MAJ de l'agent."
})
# Logrotate Qualys mal configuré
if s_lrt and "qualys" in s_lrt:
if "compress" in s_lrt and ("nocompress" in s_lrt or
not re.search(r"^\s*compress\s*$", s_lrt, re.MULTILINE)):
suggestions.append({
"severity": "medium",
"title": "Logrotate Qualys : compression désactivée",
"fix": "Sans compression, les logs archivés occupent 5-10× plus de place — "
"contribution potentielle à la saturation de /var/log."
})
if "rotate" not in s_lrt:
suggestions.append({
"severity": "low",
"title": "Logrotate Qualys : pas de directive rotate",
"fix": "Pas de politique de rétention définie → les logs s'accumulent indéfiniment."
})
# Core dump + package absent = installation incomplète / corrompue
if ("core-dump" in s_sys or "core dumped" in s_sys or "abrt" in s_sys.lower()) and \
("introuvable" in s_ver.lower() or s_ver.lower().startswith("version introuvable")):
suggestions.append({
"severity": "critical",
"title": "Installation Qualys cassée (core dump + package absent du RPM)",
"fix": "Le service systemd existe mais le binaire est manquant ou corrompu — "
"core dump systématique au démarrage. Désinstallation incomplète probable. "
"Nécessite cleanup complet (service unit + fichiers) puis réinstallation "
"via le RPM SANEF (/root/QualysCloudAgent.rpm) avec ré-activation."
})
elif "core-dump" in s_sys or "core dumped" in s_sys:
suggestions.append({
"severity": "high",
"title": "Agent Qualys core dumps en boucle",
"fix": "Le binaire crashe au démarrage (signal SIGABRT). "
"Cause possible : lib système cassée (ldd manquant), bug version agent vs OS, "
"ou conflit avec autre package."
})
# OS EOL (RHEL 5/6)
s_os = (r.get("os_release") or "").lower()
if "release 5" in s_os or "release 6" in s_os:
suggestions.append({
"severity": "low",
"title": "OS en fin de vie",
"fix": "RHEL 5/6 EOL — l'agent Qualys 7.x n'est plus supporté sur ces versions. "
"À traiter dans le plan de migration / décom du serveur."
})
return suggestions
def start_qualys_audit_async(hostname, force=False):
"""Lance audit_qualys_agent_only en background. Reuse run pending récent (<2min)."""
with _qualys_audit_lock:
existing = _qualys_audit_cache.get(hostname)
if existing and existing.get("status") == "pending" and not force:
age = (datetime.now() - existing["started_at"]).total_seconds()
if age < 120:
return False
_qualys_audit_cache[hostname] = {
"status": "pending",
"result": None,
"started_at": datetime.now(),
"finished_at": None,
"error": None,
}
def _runner():
try:
res = audit_qualys_agent_only(hostname)
with _qualys_audit_lock:
state = _qualys_audit_cache.get(hostname, {})
state.update({
"status": "ok",
"result": res,
"finished_at": datetime.now(),
})
_qualys_audit_cache[hostname] = state
except Exception as ex:
with _qualys_audit_lock:
state = _qualys_audit_cache.get(hostname, {})
state.update({
"status": "error",
"error": str(ex),
"finished_at": datetime.now(),
})
_qualys_audit_cache[hostname] = state
t = _threading.Thread(target=_runner, daemon=True)
t.start()
return True
def get_qualys_audit_state(hostname):
with _qualys_audit_lock:
return dict(_qualys_audit_cache.get(hostname, {})) or None
def audit_qualys_agent_only(hostname):
"""Audit cible Qualys Agent uniquement: status service + version + logs.
Utilise _resolve + _connect + _run comme audit_single_server.
Retourne dict {hostname, status, connection_method, resolved_fqdn, ...cmds}."""
result = {
"hostname": hostname,
"audit_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"status": "PENDING",
"connection_method": None,
"resolved_fqdn": None,
}
for k in QUALYS_AGENT_CMDS:
result[k] = None
target = _resolve(hostname)
if not target:
result["status"] = "CONNECTION_FAILED"
result["connection_method"] = f"DNS: aucun suffixe resolu ({hostname})"
return result
result["resolved_fqdn"] = target
client = _connect(target, hostname)
if not client:
result["status"] = "CONNECTION_FAILED"
result["connection_method"] = f"SSH: connexion echouee a {target}"
return result
method = _resolve_ssh_method(hostname) or "ssh_key"
result["connection_method"] = f"{method} -> {target}"
try:
# Toutes les commandes dans 1 seul script bash avec markers — 1 channel SSH unique.
# Evite le "Timeout opening channel" sur PSMP qui limite le nombre de channels.
combined_parts = []
for key, cmd in QUALYS_AGENT_CMDS.items():
combined_parts.append(f"echo '__SECTION_{key}_START__'")
combined_parts.append(cmd)
combined_parts.append(f"echo '__SECTION_{key}_END__'")
combined = "; ".join(combined_parts)
# exec_command direct avec timeout plus long (60s) car script combiné = curl 5s + plusieurs commandes
try:
_, stdout_chk, _ = client.exec_command("id -u", timeout=5)
uid = stdout_chk.read().decode().strip()
full_cmd = combined if uid == "0" else "sudo bash -c '" + combined.replace("'", "'\"'\"'") + "'"
_, stdout, stderr = client.exec_command(full_cmd, timeout=60)
big_out = stdout.read().decode("utf-8", errors="replace")
err = stderr.read().decode("utf-8", errors="replace")
if not big_out.strip() and err.strip():
# Fallback retry sans sudo si sudoers refuse
_, stdout2, _ = client.exec_command(combined, timeout=60)
big_out = stdout2.read().decode("utf-8", errors="replace")
except Exception as ex_inner:
big_out = f"ERROR: {ex_inner}"
# Parser la sortie en cherchant les markers
for key in QUALYS_AGENT_CMDS:
start_marker = f"__SECTION_{key}_START__"
end_marker = f"__SECTION_{key}_END__"
try:
section = big_out.split(start_marker, 1)[1].split(end_marker, 1)[0].strip()
except Exception:
section = "(parsing failed)"
result[key] = section or "(empty)"
result["status"] = "OK"
except Exception as e:
result["status"] = "ERROR"
result["error_msg"] = str(e)
finally:
try:
client.close()
except Exception:
pass
# Analyser les sorties pour suggerer des resolutions
if result["status"] == "OK":
result["suggestions"] = _analyze_qualys_audit(result)
else:
result["suggestions"] = []
return result