patchcenter/app/services/realtime_audit_service.py

994 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Service audit temps reel — lance des checks SSH et retourne les resultats"""
import socket
import json
import re
from datetime import datetime
from sqlalchemy import text
try:
import paramiko
PARAMIKO_OK = True
except ImportError:
PARAMIKO_OK = False
SSH_KEY_DEFAULT = "/opt/patchcenter/keys/id_ed25519"
SSH_USER_DEFAULT = "root"
SSH_TIMEOUT = 12
DNS_SUFFIXES_DEFAULT = ["", ".mpcz.fr", ".sanef.groupe", ".sanef-rec.fr", ".sanef.fr"]
def _get_dns_suffixes():
try:
from .secrets_service import get_secret
from ..database import SessionLocal
db = SessionLocal()
val = get_secret(db, "ssh_dns_suffixes")
db.close()
if val:
return [x.strip() for x in val.split(",") if x.strip() or x == ""]
except Exception:
pass
return DNS_SUFFIXES_DEFAULT
def _get_ssh_settings():
"""Lit les settings SSH depuis app_secrets dans la DB.
Retourne (key_material, user). key_material peut etre un chemin (legacy)
ou le contenu PEM (nouveau)."""
try:
from .secrets_service import get_secret
from ..database import SessionLocal
db = SessionLocal()
# Nouveau: contenu PEM direct
key_material = get_secret(db, "ssh_key_private_key")
if not key_material:
key_material = get_secret(db, "ssh_key_file") or SSH_KEY_DEFAULT
user = get_secret(db, "ssh_key_default_user") or get_secret(db, "ssh_user") or SSH_USER_DEFAULT
db.close()
return key_material, user
except Exception:
return SSH_KEY_DEFAULT, SSH_USER_DEFAULT
# Commandes d'audit (simplifiees pour le temps reel)
AUDIT_CMDS = {
"os_release": "cat /etc/redhat-release 2>/dev/null || grep '^PRETTY_NAME=' /etc/os-release 2>/dev/null | cut -d'\"' -f2",
"kernel": "uname -r",
"uptime": "uptime -p 2>/dev/null || uptime",
"selinux": "getenforce 2>/dev/null || echo N/A",
"disk_space": "df -h --output=target,size,avail,pcent 2>/dev/null | grep -vE '^(tmpfs|devtmpfs|Filesystem)' | sort",
"apps_installed": "rpm -qa --qf '%{NAME} %{VERSION}\\n' 2>/dev/null | grep -iE 'tomcat|java|jdk|nginx|httpd|haproxy|docker|podman|postgresql|postgres|mysql|mariadb|mongodb|oracle|redis|elasticsearch|splunk|centreon|qualys' | sort -u",
"services_running": "systemctl list-units --type=service --state=running --no-pager --no-legend 2>/dev/null | grep -vE '(auditd|chronyd|crond|dbus|firewalld|getty|irqbalance|kdump|lvm2|NetworkManager|polkit|postfix|rsyslog|sshd|sssd|systemd|tuned|user@)' | awk '{print $1}' | sed 's/.service//' | sort",
"running_not_enabled": "comm -23 <(systemctl list-units --type=service --state=running --no-pager --no-legend 2>/dev/null | grep -vE '(auditd|chronyd|crond|dbus|firewalld|getty|irqbalance|kdump|lvm2|NetworkManager|polkit|postfix|rsyslog|sshd|sssd|systemd|tuned|user@)' | awk '{print $1}' | sed 's/.service//' | sort) <(systemctl list-unit-files --type=service --state=enabled --no-pager --no-legend 2>/dev/null | awk '{print $1}' | sed 's/.service//' | sort) 2>/dev/null || echo none",
"listening_ports": "ss -tlnp 2>/dev/null | grep LISTEN | grep -vE ':22 |:111 |:323 ' | awk '{print $4, $6}' | sort",
"db_detect": "for svc in postgresql mariadbd mysqld mongod redis-server; do state=$(systemctl is-active $svc 2>/dev/null); [ \"$state\" = \"active\" ] && echo \"$svc:active\"; done; pgrep -x ora_pmon >/dev/null 2>&1 && echo 'oracle:active' || true",
"cluster_detect": "(which pcs 2>/dev/null && pcs status 2>/dev/null | head -3) || (test -f /etc/corosync/corosync.conf && echo 'corosync:present') || echo 'no_cluster'",
"containers": "if which podman >/dev/null 2>&1; then USERS=$(ps aux 2>/dev/null | grep -E 'conmon|podman' | grep -v grep | awk '{print $1}' | sort -u); for U in $USERS; do echo \"=== podman@$U ===\"; su - $U -c 'podman ps -a --format \"table {{.Names}} {{.Status}}\"' 2>/dev/null; done; fi; if which docker >/dev/null 2>&1; then docker ps -a --format 'table {{.Names}} {{.Status}}' 2>/dev/null; fi",
"agents": "for svc in qualys-cloud-agent sentinelone zabbix-agent; do state=$(systemctl is-active $svc 2>/dev/null); [ \"$state\" = \"active\" ] && echo \"$svc:$state\"; done",
"failed_services": "systemctl list-units --type=service --state=failed --no-pager --no-legend 2>/dev/null | awk '{print $2}' | head -10 || echo none",
"satellite": "subscription-manager identity 2>/dev/null | grep -i 'org\\|server' || echo 'not_registered'",
}
BANNER_FILTERS = [
"GROUPE SANEF", "propriété du Groupe", "accèderait", "emprisonnement",
"Article 323", "code pénal", "Authorized uses only", "CyberArk",
"This session", "session is being",
]
def _ordered_suffixes(hostname):
"""Ordre des suffixes selon la 2e lettre du hostname (convention SANEF).
r=recette, p=prod, i=infra. Les autres suffixes sont tentés en fallback."""
all_suffixes = _get_dns_suffixes()
second = hostname[1].lower() if len(hostname) > 1 else ""
if second == "r":
priority = [".sanef-rec.fr", ".sanef.groupe", ".sanef.fr"]
elif second in ("p", "i"):
priority = [".sanef.groupe", ".sanef-rec.fr", ".sanef.fr"]
else:
priority = [".sanef.groupe", ".sanef-rec.fr", ".sanef.fr"]
ordered = []
for suf in priority:
if suf in all_suffixes and suf not in ordered:
ordered.append(suf)
for suf in all_suffixes:
if suf not in ordered:
ordered.append(suf)
return ordered
def _resolve(hostname):
# 1. FQDN stocke en base - retour direct sans check port (rapide)
try:
from ..database import SessionLocal
db = SessionLocal()
row = db.execute(text(
"SELECT fqdn FROM servers WHERE LOWER(hostname)=LOWER(:h) "
"AND fqdn IS NOT NULL AND fqdn != ''"
), {"h": hostname}).fetchone()
db.close()
if row and row.fqdn:
return row.fqdn
except Exception:
pass
# 2. Fallback : boucle suffixes DNS (si FQDN manquant en base)
for suffix in _ordered_suffixes(hostname):
target = hostname + suffix
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
r = sock.connect_ex((target, 22))
sock.close()
if r == 0:
return target
except Exception:
continue
return None
def _connect_via_psmp(target):
"""Connexion via PSMP CyberArk (auth_interactive avec Vault Password)."""
if not PARAMIKO_OK:
return None
try:
from .secrets_service import get_secret
from ..database import SessionLocal
db = SessionLocal()
psmp_host = get_secret(db, "psmp_host") or "psmp.sanef.fr"
psmp_port = int(get_secret(db, "psmp_port") or "22")
cyber_user = get_secret(db, "psmp_cyberark_user") or "CYBP01336"
target_user = get_secret(db, "psmp_target_user") or "cybsecope"
password = get_secret(db, "ssh_pwd_default_pass") or ""
db.close()
if not password:
return None
username = f"{cyber_user}@{target_user}@{target}"
transport = paramiko.Transport((psmp_host, psmp_port))
transport.start_client(timeout=SSH_TIMEOUT)
transport.auth_interactive(username, lambda t, i, p: [password] * len(p))
if not transport.is_authenticated():
return None
client = paramiko.SSHClient()
client._transport = transport
return client
except Exception:
return None
def _resolve_ssh_method(hostname):
"""Retourne ssh_method configure pour le serveur (ssh_psmp / ssh_key / ssh_password / None)."""
try:
from ..database import SessionLocal
db = SessionLocal()
row = db.execute(text(
"SELECT ssh_method FROM servers WHERE LOWER(hostname)=LOWER(:h)"
), {"h": hostname.split(".")[0]}).fetchone()
db.close()
return row.ssh_method if row else None
except Exception:
return None
def _connect(target, hostname=None):
if not PARAMIKO_OK:
return None
import os
# Routage PSMP si ssh_method='ssh_psmp' pour ce serveur
method = _resolve_ssh_method(hostname or target)
if method == "ssh_psmp":
client = _connect_via_psmp(target)
if client:
return client
# fallback SSH direct si PSMP KO
ssh_key, ssh_user = _get_ssh_settings()
# 1. Essai clé SSH depuis settings (contenu PEM ou chemin legacy)
key_sources = []
if ssh_key and "BEGIN" in ssh_key and "PRIVATE KEY" in ssh_key:
from io import StringIO
key_sources = [("content", ssh_key)]
elif ssh_key and os.path.exists(ssh_key):
key_sources = [("file", ssh_key)]
for src_type, src in key_sources:
for loader_file, loader_str in [
(paramiko.Ed25519Key.from_private_key_file, paramiko.Ed25519Key.from_private_key),
(paramiko.RSAKey.from_private_key_file, paramiko.RSAKey.from_private_key),
(paramiko.ECDSAKey.from_private_key_file, paramiko.ECDSAKey.from_private_key),
]:
try:
from io import StringIO
if src_type == "file":
key = loader_file(src)
else:
key = loader_str(StringIO(src))
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(target, port=22, username=ssh_user, pkey=key,
timeout=SSH_TIMEOUT, look_for_keys=False, allow_agent=False)
return client
except Exception:
continue
# 2. Fallback mot de passe depuis les settings
try:
from .secrets_service import get_secret
from ..database import SessionLocal
db = SessionLocal()
pwd_user = get_secret(db, "ssh_pwd_default_user") or ssh_user
pwd_pass = get_secret(db, "ssh_pwd_default_pass") or ""
db.close()
if pwd_pass:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(target, port=22, username=pwd_user, password=pwd_pass,
timeout=SSH_TIMEOUT, look_for_keys=False, allow_agent=False)
return client
except Exception:
pass
return None
def _run(client, cmd):
try:
# Test root vs sudo
_, stdout, _ = client.exec_command("id -u", timeout=5)
uid = stdout.read().decode().strip()
if uid == "0":
full = cmd
else:
escaped = cmd.replace("'", "'\"'\"'")
full = f"sudo bash -c '{escaped}'"
_, stdout, stderr = client.exec_command(full, timeout=15)
out = stdout.read().decode("utf-8", errors="replace").strip()
err = stderr.read().decode("utf-8", errors="replace").strip()
# Fallback sans sudo si sudoers refuse (detection robuste case/accent insensible)
SUDO_KW = ["pas autoris", "non autoris", "not allowed to execute",
"is not allowed", "no tty present", "sudo:"]
err_low = err.lower()
sudo_refused = any(kw in err_low for kw in SUDO_KW)
if (not out) and err and sudo_refused:
_, stdout, stderr = client.exec_command(cmd, timeout=15)
out = stdout.read().decode("utf-8", errors="replace").strip()
err2 = stderr.read().decode("utf-8", errors="replace").strip()
err2_low = err2.lower()
still_sudo_err = any(kw in err2_low for kw in SUDO_KW)
if still_sudo_err:
err = err2
else:
# Retry sans sudo a abouti (sortie vide acceptable)
err = err2 if err2 else ""
if not out and not err:
out = "" # explicite : pas de containers / pas de services failed = OK
result = out if out else err
lines = [l for l in result.splitlines() if not any(b in l for b in BANNER_FILTERS) and l.strip()]
return "\n".join(lines).strip()
except Exception as e:
return f"ERROR: {e}"
def audit_single_server(hostname):
"""Audite un serveur et retourne un dict de resultats"""
result = {
"hostname": hostname,
"audit_date": datetime.now().strftime("%Y-%m-%d %H:%M"),
"status": "PENDING",
}
target = _resolve(hostname)
if not target:
result["status"] = "CONNECTION_FAILED"
result["connection_method"] = f"DNS: aucun suffixe résolu ({hostname})"
result["resolved_fqdn"] = None
return result
result["resolved_fqdn"] = target
client = _connect(target, hostname)
if not client:
result["status"] = "CONNECTION_FAILED"
result["connection_method"] = f"SSH: connexion refusée ({target})"
return result
result["status"] = "OK"
ssh_key, ssh_user = _get_ssh_settings()
result["connection_method"] = f"ssh_key ({ssh_user}@{target})"
for key, cmd in AUDIT_CMDS.items():
result[key] = _run(client, cmd)
try:
client.close()
except Exception:
pass
# Post-traitement
agents = result.get("agents", "")
result["qualys_active"] = "qualys" in agents and "active" in agents
result["sentinelone_active"] = "sentinelone" in agents and "active" in agents
result["disk_alert"] = False
for line in (result.get("disk_space") or "").split("\n"):
parts = line.split()
pcts = [p for p in parts if "%" in p]
if pcts:
try:
pct = int(pcts[0].replace("%", ""))
if pct >= 90:
result["disk_alert"] = True
except ValueError:
pass
return result
def audit_servers_list(hostnames):
"""Audite une liste de serveurs"""
results = []
for hn in hostnames:
r = audit_single_server(hn.strip())
results.append(r)
return results
# ═══════════════════════════════════════════════
# Background audit job manager
# ═══════════════════════════════════════════════
import threading
import uuid
import time as _time
_audit_jobs = {}
def start_audit_job(hostnames, parallel=3):
"""Lance un audit en arriere-plan avec pool de threads borne. Retourne le job_id."""
from concurrent.futures import ThreadPoolExecutor
job_id = str(uuid.uuid4())[:8]
job = {
"id": job_id,
"started_at": _time.time(),
"total": len(hostnames),
"done": 0,
"servers": {},
"results": [],
"finished": False,
"parallel": parallel,
}
for hn in hostnames:
job["servers"][hn] = {"hostname": hn, "stage": "pending", "detail": "En attente", "status": None}
_audit_jobs[job_id] = job
def _run():
with ThreadPoolExecutor(max_workers=max(1, int(parallel))) as pool:
for hn in hostnames:
pool.submit(_audit_one, job, hn.strip())
job["finished"] = True
job["finished_at"] = _time.time()
threading.Thread(target=_run, daemon=True).start()
return job_id
def _audit_one(job, hostname):
job["servers"][hostname]["stage"] = "resolving"
job["servers"][hostname]["detail"] = "Résolution DNS"
target = _resolve(hostname)
if not target:
job["servers"][hostname]["stage"] = "failed"
job["servers"][hostname]["detail"] = "DNS: aucun suffixe résolu"
job["servers"][hostname]["status"] = "CONNECTION_FAILED"
result = {"hostname": hostname, "status": "CONNECTION_FAILED",
"connection_method": f"DNS: aucun suffixe résolu ({hostname})", "resolved_fqdn": None}
job["results"].append(result)
job["done"] += 1
return
job["servers"][hostname]["stage"] = "connecting"
job["servers"][hostname]["detail"] = f"Connexion SSH → {target}"
client = _connect(target, hostname)
if not client:
job["servers"][hostname]["stage"] = "failed"
job["servers"][hostname]["detail"] = f"SSH refusé ({target})"
job["servers"][hostname]["status"] = "CONNECTION_FAILED"
result = {"hostname": hostname, "status": "CONNECTION_FAILED",
"connection_method": f"SSH: connexion refusée ({target})", "resolved_fqdn": target}
job["results"].append(result)
job["done"] += 1
return
job["servers"][hostname]["stage"] = "auditing"
job["servers"][hostname]["detail"] = "Collecte des données"
result = {"hostname": hostname, "status": "OK", "resolved_fqdn": target,
"audit_date": datetime.now().strftime("%Y-%m-%d %H:%M")}
ssh_key, ssh_user = _get_ssh_settings()
result["connection_method"] = f"ssh_key ({ssh_user}@{target})"
for key, cmd in AUDIT_CMDS.items():
result[key] = _run(client, cmd)
try:
client.close()
except Exception:
pass
# Post-traitement
agents = result.get("agents", "")
result["qualys_active"] = "qualys" in agents and "active" in agents
result["sentinelone_active"] = "sentinelone" in agents and "active" in agents
result["disk_alert"] = False
for line in (result.get("disk_space") or "").split("\n"):
parts = line.split()
pcts = [p for p in parts if "%" in p]
if pcts:
try:
pct = int(pcts[0].replace("%", ""))
if pct >= 90:
result["disk_alert"] = True
except ValueError:
pass
job["servers"][hostname]["stage"] = "success"
job["servers"][hostname]["detail"] = result.get("os_release", "OK")
job["servers"][hostname]["status"] = "OK"
job["results"].append(result)
job["done"] += 1
def get_audit_job(job_id):
return _audit_jobs.get(job_id)
def list_audit_jobs():
now = _time.time()
return {jid: j for jid, j in _audit_jobs.items() if now - j["started_at"] < 3600}
def save_audit_to_db(db, results):
"""Sauvegarde/met a jour les resultats d'audit en base"""
updated = 0
inserted = 0
for r in results:
hostname = r.get("hostname", "")
if not hostname:
continue
# Trouver server_id
srv = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname) = LOWER(:h)"),
{"h": hostname.split(".")[0]}).fetchone()
server_id = srv.id if srv else None
audit_date = datetime.now()
agents = r.get("agents", "")
# Upsert
existing = db.execute(text(
"SELECT id FROM server_audit WHERE server_id = :sid AND server_id IS NOT NULL"
), {"sid": server_id}).fetchone() if server_id else None
if existing:
db.execute(text("""
UPDATE server_audit SET
status = :st, connection_method = :cm, resolved_fqdn = :rf,
os_release = :os, kernel = :k, uptime = :up, selinux = :se,
disk_detail = :dd, disk_alert = :da,
apps_installed = :ai, services_running = :sr,
running_not_enabled = :rne, listening_ports = :lp,
db_detected = :db, cluster_detected = :cl, containers = :co,
agents = :ag, qualys_active = :qa, sentinelone_active = :s1,
failed_services = :fs, audit_date = :ad
WHERE id = :id
"""), {
"id": existing.id, "st": r.get("status"), "cm": r.get("connection_method"),
"rf": r.get("resolved_fqdn"), "os": r.get("os_release"), "k": r.get("kernel"),
"up": r.get("uptime"), "se": r.get("selinux"), "dd": r.get("disk_space"),
"da": r.get("disk_alert", False), "ai": r.get("apps_installed"),
"sr": r.get("services_running"), "rne": r.get("running_not_enabled"),
"lp": r.get("listening_ports"), "db": r.get("db_detect"),
"cl": r.get("cluster_detect"), "co": r.get("containers"),
"ag": agents, "qa": r.get("qualys_active", False),
"s1": r.get("sentinelone_active", False), "fs": r.get("failed_services"),
"ad": audit_date,
})
updated += 1
else:
db.execute(text("""
INSERT INTO server_audit (server_id, hostname, audit_date, status, connection_method,
resolved_fqdn, os_release, kernel, uptime, selinux, disk_detail, disk_alert,
apps_installed, services_running, running_not_enabled, listening_ports,
db_detected, cluster_detected, containers, agents, qualys_active,
sentinelone_active, failed_services)
VALUES (:sid, :hn, :ad, :st, :cm, :rf, :os, :k, :up, :se, :dd, :da,
:ai, :sr, :rne, :lp, :db, :cl, :co, :ag, :qa, :s1, :fs)
"""), {
"sid": server_id, "hn": hostname, "ad": audit_date,
"st": r.get("status"), "cm": r.get("connection_method"),
"rf": r.get("resolved_fqdn"), "os": r.get("os_release"), "k": r.get("kernel"),
"up": r.get("uptime"), "se": r.get("selinux"), "dd": r.get("disk_space"),
"da": r.get("disk_alert", False), "ai": r.get("apps_installed"),
"sr": r.get("services_running"), "rne": r.get("running_not_enabled"),
"lp": r.get("listening_ports"), "db": r.get("db_detect"),
"cl": r.get("cluster_detect"), "co": r.get("containers"),
"ag": agents, "qa": r.get("qualys_active", False),
"s1": r.get("sentinelone_active", False), "fs": r.get("failed_services"),
})
inserted += 1
# Mettre a jour la table servers avec les infos de l'audit
if server_id and r.get("status") == "OK":
resolved = r.get("resolved_fqdn", "")
# Resoudre l'IP depuis le FQDN
ip_addr = None
if resolved:
try:
ip_addr = socket.gethostbyname(resolved)
except Exception:
pass
from .itop_service import _normalize_os_for_itop
updates = {}
if r.get("os_release"):
updates["os_version"] = _normalize_os_for_itop(r["os_release"].strip())
if ip_addr:
updates["fqdn"] = resolved
if updates:
sets = ", ".join(f"{k} = :{k}" for k in updates)
updates["sid"] = server_id
db.execute(text(f"UPDATE servers SET {sets}, updated_at = NOW() WHERE id = :sid"), updates)
# Mettre a jour/inserer dans server_ips
if ip_addr:
existing_ip = db.execute(text(
"SELECT id FROM server_ips WHERE server_id = :sid AND ip_address = :ip"
), {"sid": server_id, "ip": ip_addr}).fetchone()
if not existing_ip:
db.execute(text(
"INSERT INTO server_ips (server_id, ip_address, ip_type, is_ssh, description) VALUES (:sid, :ip, 'primary', true, 'audit')"
), {"sid": server_id, "ip": ip_addr})
db.commit()
return updated, inserted
# ===========================================================================
# AUDIT CIBLE QUALYS AGENT — pour bouton "Check" sur page Agents inactifs
# Utilise la meme mecanique de connexion que audit_single_server (DB-driven)
# ===========================================================================
QUALYS_AGENT_CMDS = {
"os_release": "cat /etc/redhat-release 2>/dev/null || (grep '^PRETTY_NAME=' /etc/os-release 2>/dev/null | cut -d'\"' -f2) || uname -sr",
"agent_status": (
"if command -v systemctl >/dev/null 2>&1; then "
" systemctl status qualys-cloud-agent --no-pager 2>&1 | head -25; "
"elif [ -x /etc/init.d/qualys-cloud-agent ]; then "
" /etc/init.d/qualys-cloud-agent status 2>&1 | head -25; "
"elif command -v service >/dev/null 2>&1; then "
" service qualys-cloud-agent status 2>&1 | head -25; "
"else "
" echo '--- ps (init system inconnu) ---'; "
" ps -ef 2>/dev/null | grep -i qualys-cloud-agent | grep -v grep | head -5 || echo 'aucun process Qualys'; "
"fi"
),
"agent_version": (
"(rpm -q qualys-cloud-agent 2>/dev/null) || "
"(dpkg -l qualys-cloud-agent 2>/dev/null | awk '/^ii/{print $2,$3}') || "
"(/usr/local/qualys/cloud-agent/bin/qualys-cloud-agent.sh -v 2>&1) || "
"echo 'version introuvable'"
),
"agent_log": (
"for f in /var/log/qualys/qualys-cloud-agent.log "
"/var/log/qualys-cloud-agent/qualys-cloud-agent.log "
"/usr/local/qualys/cloud-agent/log/qualys-cloud-agent.log "
"/var/log/qualysagent/qualysagent.log; do "
" if [ -e \"$f\" ]; then "
" out=$(tail -50 \"$f\" 2>/dev/null || sudo -n tail -50 \"$f\" 2>/dev/null); "
" if [ -n \"$out\" ]; then echo \"=== $f ===\"; echo \"$out\"; exit 0; fi; "
" echo \"=== $f (existe mais non lisible — sudo refuse) ===\"; "
" fi; "
"done; "
"echo 'log Qualys introuvable. Chemins testes: /var/log/qualys/*, /var/log/qualys-cloud-agent/*, /usr/local/qualys/cloud-agent/log/*, /var/log/qualysagent/*'"
),
"disk_space": (
"echo '=== Disque global ==='; "
"df -h 2>/dev/null | grep -vE '^(tmpfs|devtmpfs|Filesystem|overlay|/dev/loop)' | head -15; "
"echo; echo '=== /var/log (partition agent) ==='; "
"df -h /var/log 2>/dev/null | tail -1; "
"echo; echo '=== Top 5 dossiers /var/log ==='; "
"(du -sh /var/log/* 2>/dev/null | sort -rh | head -5) || (sudo -n du -sh /var/log/* 2>/dev/null | sort -rh | head -5) || echo '(non lisible)'"
),
"qualys_connectivity": (
# Test minimal : si HTTP code reçu -> connectivité OK. Sinon dérouler le diag.
"URL=https://qagpublic.qg1.apps.qualys.eu/Qlys/CloudAgent/status; "
"if command -v curl >/dev/null 2>&1; then "
" CODE=$(curl --connect-timeout 5 -sS -o /dev/null -w '%{http_code}' \"$URL\" 2>/dev/null); "
" if [ -n \"$CODE\" ] && [ \"$CODE\" != \"000\" ]; then "
" echo \"✓ Connectivité OK (HTTP $CODE depuis $URL)\"; "
" else "
" echo '✗ Connectivité KO — diag détaillé :'; echo; "
" echo '--- DNS ---'; "
" (getent hosts qagpublic.qg1.apps.qualys.eu 2>/dev/null || nslookup qagpublic.qg1.apps.qualys.eu 2>/dev/null | tail -3) || echo 'DNS KO'; "
" echo '--- curl verbeux ---'; "
" curl --connect-timeout 5 -v -sS -o /dev/null \"$URL\" 2>&1 | grep -E 'Trying|Connected|connect|Could not|refused|timed out|verify|SSL' | head -10; "
" fi; "
"else echo '(curl absent — impossible de tester)'; fi"
),
"lvm_info": (
"echo '=== Volume Groups (espace libre dans le VG) ==='; "
"(sudo -n vgs --noheadings --units g -o vg_name,vg_size,vg_free 2>/dev/null || "
" vgs --noheadings --units g -o vg_name,vg_size,vg_free 2>/dev/null) | head -10 || echo '(pas LVM ou commande non autorisee)'; "
"echo; echo '=== Logical Volumes (filtre log/var) ==='; "
"(sudo -n lvs --noheadings --units g -o lv_name,vg_name,lv_size,lv_attr 2>/dev/null || "
" lvs --noheadings --units g -o lv_name,vg_name,lv_size,lv_attr 2>/dev/null) | grep -iE 'log|var' || echo '(pas de LV log/var ou non lisible)'; "
"echo; echo '=== FS type sur /var/log ==='; "
"(stat -f -c '%T' /var/log 2>/dev/null) || (df -T /var/log 2>/dev/null | awk 'NR==2{print $2}') || echo '(stat KO)'"
),
"logrotate_config": (
"FOUND=0; "
"for f in /etc/logrotate.d/qualys-cloud-agent /etc/logrotate.d/qualys "
"/etc/logrotate.d/qualysagent; do "
" if [ -e \"$f\" ]; then echo \"=== $f ===\"; (cat \"$f\" 2>/dev/null || sudo -n cat \"$f\" 2>/dev/null); FOUND=1; fi; "
"done; "
"if [ $FOUND -eq 0 ]; then echo '(pas de config logrotate dediee Qualys — l agent gere ses logs en interne)'; fi; "
"echo; echo '=== /etc/qualys/cloud-agent/qagent-log.conf ==='; "
"(cat /etc/qualys/cloud-agent/qagent-log.conf 2>/dev/null || sudo -n cat /etc/qualys/cloud-agent/qagent-log.conf 2>/dev/null) || echo '(non trouve / non lisible)'"
),
"system_log": (
"if command -v journalctl >/dev/null 2>&1; then "
" out=$(journalctl -u qualys-cloud-agent --no-pager -n 50 2>/dev/null || sudo -n journalctl -u qualys-cloud-agent --no-pager -n 50 2>/dev/null); "
" if [ -n \"$out\" ]; then echo \"$out\"; else echo '(journalctl: aucune entree ou non autorise)'; fi; "
"elif [ -e /var/log/messages ]; then "
" echo '--- /var/log/messages (filtre qualys, 50 derniers) ---'; "
" out=$(grep -i qualys /var/log/messages 2>/dev/null | tail -50 || sudo -n grep -i qualys /var/log/messages 2>/dev/null | tail -50); "
" if [ -n \"$out\" ]; then echo \"$out\"; else echo '(aucune entree qualys ou sudo refuse)'; fi; "
"elif [ -e /var/log/syslog ]; then "
" echo '--- /var/log/syslog (filtre qualys, 50 derniers) ---'; "
" out=$(grep -i qualys /var/log/syslog 2>/dev/null | tail -50 || sudo -n grep -i qualys /var/log/syslog 2>/dev/null | tail -50); "
" if [ -n \"$out\" ]; then echo \"$out\"; else echo '(aucune entree qualys ou sudo refuse)'; fi; "
"else "
" echo 'logs systeme indisponibles (journalctl absent, messages/syslog non trouves)'; "
"fi"
),
}
import threading as _threading
_qualys_audit_cache = {} # hostname -> {status, result, started_at, finished_at, error}
_qualys_audit_lock = _threading.Lock()
def _analyze_qualys_audit(r):
"""Analyse les sorties d'audit pour suggerer des resolutions concretes.
Retourne liste de {severity: critical|high|medium|low, title, fix}."""
import re
suggestions = []
s_status = (r.get("agent_status") or "").lower()
s_log = (r.get("agent_log") or "").lower()
s_sys = (r.get("system_log") or "").lower()
s_disk = (r.get("disk_space") or "")
s_conn = (r.get("qualys_connectivity") or "").lower()
s_ver = (r.get("agent_version") or "")
s_lvm = (r.get("lvm_info") or "")
s_lrt = (r.get("logrotate_config") or "").lower()
# Disque saturé / agent ne peut écrire
disk_full = " 100%" in s_disk or "no space left" in (s_log + s_sys)
if disk_full:
suggestions.append({
"severity": "critical",
"title": "Partition /var/log apparaît saturée",
"fix": "Constat : une partition est à 100% et/ou l'agent rapporte 'no space left'. "
"Hypothèse : pourrait empêcher l'agent d'écrire ses logs.\n"
"Proposition : ouvrir un ticket support pour vérifier l'état du disque "
"et envisager soit un cleanup des logs archivés, soit une extension du FS "
"(snapshot vCenter recommandé avant toute action)."
})
# Si LVM avec free dans VG -> note diagnostic
m_vg = re.search(r"(\S+)\s+([\d.]+)g\s+([\d.]+)g", s_lvm.lower())
if m_vg and float(m_vg.group(3)) > 0.5:
vg_name = m_vg.group(1)
free_gb = float(m_vg.group(3))
suggestions.append({
"severity": "high",
"title": f"LVM : extension FS potentiellement possible (VG {vg_name} ~{free_gb}G libres)",
"fix": f"Constat : le VG {vg_name} semble disposer de ~{free_gb}G non alloués. "
f"Proposition : ticket support pour validation et extension du LV /var/log si pertinent "
f"(snapshot vCenter à prévoir au préalable)."
})
if "cannot write file" in s_sys or "logger initialization failed" in s_sys:
suggestions.append({
"severity": "critical",
"title": "L'agent semble ne pas pouvoir écrire son log",
"fix": "Constat : présence de 'Cannot write file' / 'Logger initialization failed' dans les logs. "
"Hypothèses possibles : disque saturé, permissions cassées sur /var/log/qualys. "
"Proposition : ticket support pour analyse."
})
# Crash loop
m = re.search(r"restart counter is at (\d+)", s_sys)
if m and int(m.group(1)) > 50:
suggestions.append({
"severity": "high",
"title": f"Possible boucle de redémarrage (~{m.group(1)} restarts observés)",
"fix": "Constat : le compteur de restart systemd est élevé. "
"Hypothèse : crash loop persistant. "
"Proposition : ticket support pour analyse de la cause racine et arrêt temporaire du service "
"le temps de l'investigation (afin de limiter le bruit dans les logs)."
})
# Connectivité KO
if any(k in s_conn for k in ["connexion directe echec", "connection refused", "timed out",
"could not resolve", "no route", "unreachable", "dns ko"]):
suggestions.append({
"severity": "high",
"title": "Connectivité Qualys cloud apparaît KO (flux direct)",
"fix": "Constat : le test direct vers qagpublic.qg1.apps.qualys.eu:443 ne répond pas. "
"Hypothèse : flux 443 sortant peut-être bloqué côté firewall périmétrique. "
"Proposition : ticket réseau pour vérifier les règles de flux applicables à ce serveur, "
"en comparaison avec un serveur où l'agent fonctionne."
})
if "certificate verify failed" in s_conn or ("ssl" in s_conn and "verify" in s_conn):
suggestions.append({
"severity": "high",
"title": "Erreur TLS/SSL observée",
"fix": "Constat : la vérification du certificat semble échouer. "
"Hypothèses : interception SSL par un équipement intermédiaire, ou bundle CA système à mettre à jour. "
"Proposition : ticket support sécu/réseau pour analyse."
})
# Service désactivé / arrêté
if "masked" in s_status:
suggestions.append({
"severity": "medium",
"title": "Service apparaît masked",
"fix": "Constat : le service systemd Qualys semble masqué. "
"Proposition : ticket support pour identifier la raison (manuel ? configuration ?) "
"et décider de l'action."
})
elif "disabled" in s_status:
suggestions.append({
"severity": "medium",
"title": "Service apparaît disabled au boot",
"fix": "Constat : le service ne démarrera pas automatiquement au prochain reboot. "
"Hypothèses : volontaire (serveur en décom) ou oubli post-intervention. "
"Proposition : ticket support pour clarifier."
})
elif any(k in s_status for k in ["inactive (dead)", "stopped", "not running"]) \
and "active" not in s_status:
suggestions.append({
"severity": "medium",
"title": "Service apparaît arrêté",
"fix": "Constat : l'agent ne semble plus en cours d'exécution. "
"Proposition : ticket support pour identifier la cause de l'arrêt (voir logs ci-dessous)."
})
# Agent obsolète
if re.match(r"^qualys-cloud-agent-([0-5]\.|6\.[01]\.)", s_ver):
suggestions.append({
"severity": "low",
"title": f"Version agent ancienne détectée ({s_ver.strip()})",
"fix": "Constat : version antérieure à 7.x, potentiellement plus supportée. "
"Proposition : à inclure dans le plan de MAJ des agents Qualys."
})
# Logrotate Qualys mal configuré
if s_lrt and "qualys" in s_lrt:
if "compress" in s_lrt and ("nocompress" in s_lrt or
not re.search(r"^\s*compress\s*$", s_lrt, re.MULTILINE)):
suggestions.append({
"severity": "medium",
"title": "Logrotate Qualys : compression semble désactivée",
"fix": "Constat : la directive 'compress' n'apparaît pas active. "
"Hypothèse : les logs archivés peuvent occuper 5-10× plus de place sans compression. "
"Proposition : ticket support pour vérifier la configuration logrotate."
})
if "rotate" not in s_lrt:
suggestions.append({
"severity": "low",
"title": "Logrotate Qualys : directive rotate absente",
"fix": "Constat : pas de politique de rétention détectée. "
"Hypothèse : les logs pourraient s'accumuler indéfiniment. "
"Proposition : ticket support pour ajout d'une politique adaptée."
})
# Core dump + package absent = installation incomplète / corrompue
if ("core-dump" in s_sys or "core dumped" in s_sys or "abrt" in s_sys.lower()) and \
("introuvable" in s_ver.lower() or s_ver.lower().startswith("version introuvable")):
suggestions.append({
"severity": "critical",
"title": "Possible installation Qualys cassée (core dump + package non détecté)",
"fix": "Constat : service systemd actif mais aucun package qualys-cloud-agent dans la base RPM, "
"et core dumps répétés. "
"Hypothèse : désinstallation incomplète ou installation corrompue. "
"Proposition : ticket support pour cleanup complet et réinstallation via RPM SANEF "
"(/root/QualysCloudAgent.rpm) avec ré-activation."
})
elif "core-dump" in s_sys or "core dumped" in s_sys:
suggestions.append({
"severity": "high",
"title": "Core dumps observés sur l'agent",
"fix": "Constat : le binaire semble crasher au démarrage (signal SIGABRT). "
"Hypothèses : lib système cassée, incompatibilité version agent vs OS, conflit. "
"Proposition : ticket support pour analyse approfondie."
})
# OS EOL (RHEL 5/6)
s_os = (r.get("os_release") or "").lower()
if "release 5" in s_os or "release 6" in s_os:
suggestions.append({
"severity": "low",
"title": "OS détecté en fin de vie",
"fix": "Constat : RHEL 5/6 EOL. "
"Hypothèse : l'agent Qualys 7.x peut ne pas être supporté sur cette version. "
"Proposition : à intégrer au plan de migration/décom du serveur."
})
return suggestions
def start_qualys_audit_async(hostname, force=False):
"""Lance audit_qualys_agent_only en background. Reuse run pending récent (<2min)."""
with _qualys_audit_lock:
existing = _qualys_audit_cache.get(hostname)
if existing and existing.get("status") == "pending" and not force:
age = (datetime.now() - existing["started_at"]).total_seconds()
if age < 120:
return False
_qualys_audit_cache[hostname] = {
"status": "pending",
"result": None,
"started_at": datetime.now(),
"finished_at": None,
"error": None,
}
def _runner():
try:
res = audit_qualys_agent_only(hostname)
with _qualys_audit_lock:
state = _qualys_audit_cache.get(hostname, {})
state.update({
"status": "ok",
"result": res,
"finished_at": datetime.now(),
})
_qualys_audit_cache[hostname] = state
except Exception as ex:
with _qualys_audit_lock:
state = _qualys_audit_cache.get(hostname, {})
state.update({
"status": "error",
"error": str(ex),
"finished_at": datetime.now(),
})
_qualys_audit_cache[hostname] = state
t = _threading.Thread(target=_runner, daemon=True)
t.start()
return True
def get_qualys_audit_state(hostname):
with _qualys_audit_lock:
return dict(_qualys_audit_cache.get(hostname, {})) or None
def audit_qualys_agent_only(hostname):
"""Audit cible Qualys Agent uniquement: status service + version + logs.
Utilise _resolve + _connect + _run comme audit_single_server.
Retourne dict {hostname, status, connection_method, resolved_fqdn, ...cmds}."""
result = {
"hostname": hostname,
"audit_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"status": "PENDING",
"connection_method": None,
"resolved_fqdn": None,
}
for k in QUALYS_AGENT_CMDS:
result[k] = None
target = _resolve(hostname)
if not target:
result["status"] = "CONNECTION_FAILED"
result["connection_method"] = f"DNS: aucun suffixe resolu ({hostname})"
return result
result["resolved_fqdn"] = target
client = _connect(target, hostname)
if not client:
result["status"] = "CONNECTION_FAILED"
result["connection_method"] = f"SSH: connexion echouee a {target}"
return result
method = _resolve_ssh_method(hostname) or "ssh_key"
result["connection_method"] = f"{method} -> {target}"
try:
# Toutes les commandes dans 1 seul script bash avec markers — 1 channel SSH unique.
# Evite le "Timeout opening channel" sur PSMP qui limite le nombre de channels.
combined_parts = []
for key, cmd in QUALYS_AGENT_CMDS.items():
combined_parts.append(f"echo '__SECTION_{key}_START__'")
combined_parts.append(cmd)
combined_parts.append(f"echo '__SECTION_{key}_END__'")
combined = "; ".join(combined_parts)
# exec_command direct avec timeout plus long (60s) car script combiné = curl 5s + plusieurs commandes
try:
_, stdout_chk, _ = client.exec_command("id -u", timeout=5)
uid = stdout_chk.read().decode().strip()
full_cmd = combined if uid == "0" else "sudo bash -c '" + combined.replace("'", "'\"'\"'") + "'"
_, stdout, stderr = client.exec_command(full_cmd, timeout=60)
big_out = stdout.read().decode("utf-8", errors="replace")
err = stderr.read().decode("utf-8", errors="replace")
if not big_out.strip() and err.strip():
# Fallback retry sans sudo si sudoers refuse
_, stdout2, _ = client.exec_command(combined, timeout=60)
big_out = stdout2.read().decode("utf-8", errors="replace")
except Exception as ex_inner:
big_out = f"ERROR: {ex_inner}"
# Parser la sortie en cherchant les markers
for key in QUALYS_AGENT_CMDS:
start_marker = f"__SECTION_{key}_START__"
end_marker = f"__SECTION_{key}_END__"
try:
section = big_out.split(start_marker, 1)[1].split(end_marker, 1)[0].strip()
except Exception:
section = "(parsing failed)"
result[key] = section or "(empty)"
result["status"] = "OK"
except Exception as e:
result["status"] = "ERROR"
result["error_msg"] = str(e)
finally:
try:
client.close()
except Exception:
pass
# Analyser les sorties pour suggerer des resolutions
if result["status"] == "OK":
result["suggestions"] = _analyze_qualys_audit(result)
# Resume binaire pour la checklist en tete de page
s_conn = (result.get("qualys_connectivity") or "")
s_disk = (result.get("disk_space") or "")
s_status = (result.get("agent_status") or "").lower()
s_ver = (result.get("agent_version") or "")
s_sys = (result.get("system_log") or "")
result["check_connectivity"] = "Connectivité OK" in s_conn
result["check_disk"] = " 100%" not in s_disk
result["check_service"] = "active (running)" in s_status or \
("running" in s_status and "active" in s_status)
result["check_installed"] = bool(re.search(r"qualys-cloud-agent[-\s]\d", s_ver))
# Ligne saturee si disque KO
result["disk_saturated_line"] = ""
if not result["check_disk"]:
for ln in s_disk.split("\n"):
if " 100%" in ln:
result["disk_saturated_line"] = ln.strip()
break
# Garder seulement les 10 dernieres lignes du log systeme
sys_lines = [l for l in s_sys.split("\n") if l.strip()]
result["system_log"] = "\n".join(sys_lines[-10:])
else:
result["suggestions"] = []
result["check_connectivity"] = False
result["check_disk"] = False
result["check_service"] = False
result["check_installed"] = False
result["disk_saturated_line"] = ""
return result