patchcenter/app/services/quickwin_prereq_service.py
Khalid MOUTAOUAKIL e96d79aae3 QuickWin: prereq/snapshot services, referentiel, logs, correspondance
- Split quickwin services: prereq, snapshot, log services
- Add referentiel router and template
- QuickWin detail: prereq/snapshot terminal divs for production
- Server edit partial updates
- QuickWin correspondance and logs templates
- Base template updates

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-10 18:13:00 +02:00

388 lines
12 KiB
Python

"""Service prereq QuickWin — resolution DNS, SSH, disque, satellite
Adapte au contexte SANEF : PSMP CyberArk + SSH key selon methode serveur"""
import socket
import logging
import os
log = logging.getLogger("quickwin.prereq")
try:
import paramiko
PARAMIKO_OK = True
except ImportError:
PARAMIKO_OK = False
log.warning("paramiko non disponible — checks SSH impossibles")
# --- Constantes ---
DOMP = "sanef.groupe" # domaine prod/preprod/dev
DOMR = "sanef-rec.fr" # domaine recette/test
PSMP_HOST = "psmp.sanef.fr"
CYBR_USER = "CYBP01336"
TARGET_USER = "cybsecope"
SSH_KEY_FILE_DEFAULT = "/opt/patchcenter/keys/id_rsa_cybglobal.pem"
SSH_TIMEOUT = 15
# Seuils disque (% utilise)
DISK_MAX_PCT = 90 # >90% = KO
# Banniere CyberArk a filtrer
BANNER_FILTERS = [
"GROUPE SANEF", "propriete du Groupe", "accederait", "emprisonnement",
"Article 323", "code penal", "Authorized uses only", "CyberArk",
"This session", "session is being",
]
def _get_settings(db):
"""Charge les settings utiles depuis la table settings"""
from sqlalchemy import text
rows = db.execute(text(
"SELECT key, value FROM settings WHERE key IN "
"('psmp_host','default_ssh_timeout','disk_min_root_mb')"
)).fetchall()
return {r.key: r.value for r in rows}
def _get_secret(db, key):
"""Recupere un secret dechiffre depuis app_secrets"""
try:
from ..services.secrets_service import get_secret
return get_secret(db, key)
except Exception:
return None
# =========================================================
# 1. RESOLUTION DNS
# =========================================================
def _detect_env(hostname):
"""Detecte l'environnement par la 2e lettre du hostname (convention SANEF)
p=prod, i=preprod, r=recette, v/t=test, d=dev"""
if len(hostname) < 2:
return "unknown"
c = hostname[1].lower()
if c == "p":
return "prod"
elif c == "i":
return "preprod"
elif c == "r":
return "recette"
elif c in ("v", "t"):
return "test"
elif c == "d":
return "dev"
return "unknown"
def _resolve_fqdn(hostname, domain_ltd=None, env_code=None):
"""Resout le hostname en FQDN testable.
Retourne (fqdn, None) ou (None, error_msg).
Logique:
- Prod/Preprod/Dev: domp d'abord, puis domr
- Recette/Test: domr d'abord, puis domp
- Utilise domain_ltd si dispo, sinon detection par hostname
"""
if "." in hostname:
# Deja un FQDN
if _dns_resolves(hostname):
return hostname, None
return None, f"FQDN {hostname} non resolvable"
# Determiner l'ordre des domaines
env = _detect_env(hostname)
if env_code:
ec = env_code.upper()
if ec in ("PRD", "PPR", "DEV"):
env = "prod"
elif ec in ("REC",):
env = "recette"
elif ec in ("TES", "TS1", "TS2"):
env = "test"
if env in ("prod", "preprod", "dev"):
domains_order = [DOMP, DOMR]
elif env in ("recette", "test"):
domains_order = [DOMR, DOMP]
else:
# Fallback: utiliser domain_ltd si dispo
if domain_ltd and domain_ltd.strip():
alt = DOMR if domain_ltd.strip() == DOMP else DOMP
domains_order = [domain_ltd.strip(), alt]
else:
domains_order = [DOMP, DOMR]
# Tenter resolution dans l'ordre
for dom in domains_order:
fqdn = f"{hostname}.{dom}"
if _dns_resolves(fqdn):
return fqdn, None
return None, f"DNS KO: {hostname} non resolu ({'/'.join(domains_order)})"
def _dns_resolves(fqdn):
"""Verifie si un FQDN se resout en IP"""
try:
socket.getaddrinfo(fqdn, 22, socket.AF_INET, socket.SOCK_STREAM)
return True
except (socket.gaierror, socket.herror, OSError):
return False
# =========================================================
# 2. TEST SSH
# =========================================================
def _get_ssh_key_path(db=None):
"""Retourne le chemin de la cle SSH. Cherche d'abord dans app_secrets (ssh_key_file),
puis fallback sur le chemin par defaut."""
if db:
secret_path = _get_secret(db, "ssh_key_file")
if secret_path and secret_path.strip() and os.path.exists(secret_path.strip()):
return secret_path.strip()
if os.path.exists(SSH_KEY_FILE_DEFAULT):
return SSH_KEY_FILE_DEFAULT
return None
def _load_ssh_key(db=None):
"""Charge la cle SSH privee depuis le chemin configure en base ou par defaut"""
key_path = _get_ssh_key_path(db)
if not key_path:
return None
for cls in [paramiko.RSAKey, paramiko.Ed25519Key, paramiko.ECDSAKey]:
try:
return cls.from_private_key_file(key_path)
except Exception:
continue
return None
def _ssh_via_psmp(fqdn, password):
"""Connexion SSH via PSMP CyberArk (interactive auth).
Username format: CYBP01336@cybsecope@fqdn"""
if not password:
return None, "PSMP: pas de mot de passe configure"
try:
username = f"{CYBR_USER}@{TARGET_USER}@{fqdn}"
transport = paramiko.Transport((PSMP_HOST, 22))
transport.connect()
def handler(title, instructions, prompt_list):
return [password] * len(prompt_list)
transport.auth_interactive(username, handler)
client = paramiko.SSHClient()
client._transport = transport
return client, None
except Exception as e:
return None, f"PSMP: {str(e)[:120]}"
def _ssh_via_key(fqdn, ssh_user, key):
"""Connexion SSH directe par cle"""
if not key:
return None, "SSH key: cle non trouvee"
if not ssh_user:
return None, "SSH key: user non configure"
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(fqdn, port=22, username=ssh_user, pkey=key,
timeout=SSH_TIMEOUT, look_for_keys=False, allow_agent=False)
return client, None
except Exception as e:
return None, f"SSH key: {str(e)[:120]}"
def _ssh_connect(fqdn, ssh_method, db):
"""Connecte au serveur selon la methode (ssh_psmp ou ssh_key).
Retourne (client, error_msg)"""
if not PARAMIKO_OK:
return None, "paramiko non installe"
if ssh_method == "ssh_psmp":
# PSMP: password depuis app_secrets
password = _get_secret(db, "ssh_pwd_default_pass")
client, err = _ssh_via_psmp(fqdn, password)
if client:
return client, None
# Fallback: tenter par cle
key = _load_ssh_key(db)
ssh_user = _get_secret(db, "ssh_pwd_default_user") or TARGET_USER
client2, err2 = _ssh_via_key(fqdn, ssh_user, key)
if client2:
return client2, None
return None, err # retourner l'erreur PSMP originale
else:
# ssh_key: user depuis secrets, cle depuis fichier
ssh_user = _get_secret(db, "ssh_pwd_default_user") or TARGET_USER
key = _load_ssh_key(db)
client, err = _ssh_via_key(fqdn, ssh_user, key)
if client:
return client, None
# Fallback: tenter via PSMP
password = _get_secret(db, "ssh_pwd_default_pass")
if password:
client2, err2 = _ssh_via_psmp(fqdn, password)
if client2:
return client2, None
return None, err
def _ssh_exec(client, cmd, timeout=12):
"""Execute une commande via SSH et retourne (stdout, stderr, returncode).
Filtre les bannieres CyberArk."""
try:
chan = client._transport.open_session()
chan.settimeout(timeout)
chan.exec_command(cmd)
out = b""
err = b""
while True:
try:
chunk = chan.recv(8192)
if not chunk:
break
out += chunk
except Exception:
break
try:
err = chan.recv_stderr(8192)
except Exception:
pass
rc = chan.recv_exit_status()
chan.close()
# Filtrer bannieres
out_str = out.decode("utf-8", errors="replace")
lines = [l for l in out_str.splitlines() if not any(b in l for b in BANNER_FILTERS)]
return "\n".join(lines), err.decode("utf-8", errors="replace"), rc
except Exception as e:
return "", str(e), -1
# =========================================================
# 3. TEST ESPACE DISQUE
# =========================================================
def _check_disk(client):
"""Verifie l'espace disque / et /var via sudo df.
Retourne (ok, detail_msg)"""
out, err, rc = _ssh_exec(client, "sudo df / /var --output=target,pcent 2>/dev/null | tail -n +2")
if rc != 0 or not out.strip():
return True, "Disque: non verifie (df echoue)"
ok = True
parts = []
for line in out.strip().splitlines():
tokens = line.split()
if len(tokens) >= 2:
mount = tokens[0]
pct_str = tokens[-1].replace("%", "").strip()
if pct_str.isdigit():
pct = int(pct_str)
if pct >= DISK_MAX_PCT:
ok = False
parts.append(f"{mount}={pct}% KO")
else:
parts.append(f"{mount}={pct}%")
if not parts:
return True, "Disque: non verifie"
return ok, "Disque: " + ", ".join(parts)
# =========================================================
# 4. TEST SATELLITE / YUM REPOS
# =========================================================
def _check_satellite(client):
"""Verifie l'enregistrement Satellite et les repos YUM.
Retourne (ok, detail_msg)"""
# Tenter subscription-manager d'abord
out, err, rc = _ssh_exec(client, "sudo subscription-manager status 2>/dev/null | head -5")
if rc == 0 and "Current" in out:
return True, "Satellite: OK (subscription-manager)"
# Fallback: yum repolist
out2, err2, rc2 = _ssh_exec(client, "sudo yum repolist 2>/dev/null | tail -1")
if rc2 == 0 and out2.strip():
line = out2.strip()
# Si "repolist: 0" => pas de repos
if "repolist: 0" in line.lower():
return False, "Satellite: KO (0 repos)"
return True, f"Satellite: OK ({line[:60]})"
return False, "Satellite: KO (pas de repos ni subscription)"
# =========================================================
# ORCHESTRATEUR PRINCIPAL
# =========================================================
def check_server_prereqs(hostname, db, domain_ltd=None, env_code=None,
ssh_method="ssh_key"):
"""Verification complete des prerequis d'un serveur QuickWin.
Etapes:
1. Resolution DNS (domp/domr selon env)
2. Test SSH (PSMP ou key selon ssh_method)
3. Espace disque (sudo df)
4. Satellite/YUM
Retourne dict:
dns_ok, ssh_ok, disk_ok, satellite_ok,
fqdn, detail, skip (True si serveur a ignorer)
"""
result = {
"dns_ok": False, "ssh_ok": False, "disk_ok": False, "satellite_ok": False,
"fqdn": "", "detail": "", "skip": False,
}
detail_parts = []
# 1. Resolution DNS
fqdn, dns_err = _resolve_fqdn(hostname, domain_ltd, env_code)
if not fqdn:
result["detail"] = dns_err
result["skip"] = True
log.warning(f"[{hostname}] {dns_err}")
return result
result["dns_ok"] = True
result["fqdn"] = fqdn
detail_parts.append(f"DNS: OK ({fqdn})")
# 2. Test SSH
client, ssh_err = _ssh_connect(fqdn, ssh_method, db)
if not client:
detail_parts.append(f"SSH: KO ({ssh_err})")
result["detail"] = " | ".join(detail_parts)
result["skip"] = True
log.warning(f"[{hostname}] SSH KO: {ssh_err}")
return result
result["ssh_ok"] = True
method_label = "PSMP" if ssh_method == "ssh_psmp" else "key"
detail_parts.append(f"SSH: OK ({method_label})")
try:
# 3. Espace disque
disk_ok, disk_detail = _check_disk(client)
result["disk_ok"] = disk_ok
detail_parts.append(disk_detail)
# 4. Satellite
sat_ok, sat_detail = _check_satellite(client)
result["satellite_ok"] = sat_ok
detail_parts.append(sat_detail)
finally:
try:
client.close()
except Exception:
pass
result["detail"] = " | ".join(detail_parts)
return result