feat(prepatch): iteration multi-candidats SSH selon priorite SANEF

Probleme: vrexpbtex1 est sur .sanef.groupe (exception a la convention vr*=.sanef-rec.fr).
Le code tentait UN seul target via _resolve (TCP port 22 only) et echouait sans fallback SSH.

Logique de resolution revisee:
1. Convention en 1er: suffixe le plus probable selon prefixe (vr/lr/sr=>.sanef-rec.fr,
   vp/lp/sp=>.sanef.groupe, etc.)
2. servers.fqdn (BDD) en 2e position si renseigne et plausible (commence par hostname.)
3. Autres suffixes du referentiel ensuite

Implementation:
- _candidate_targets(hostname) : nouvelle fonction qui retourne la liste ordonnee
  des FQDN candidats a essayer SSH
- _connect(target, hostname, errors=...) : accepte une list errors optionnelle ou
  les exceptions de chaque tentative (PSMP/cle/password) sont append en clair pour
  diagnostic. Retro-compatible (errors=None par defaut)
- _connect_via_psmp(target, errors=...) : meme pattern
- _fqdn_is_consistent : assoupli, ne verifie plus la convention SANEF (qui rejetait
  les exceptions legitimes), juste que le FQDN commence par hostname.
- run_all_checks: itere _candidate_targets et essaie _connect sur chaque, accumule
  les erreurs de tous les candidats, retourne le 1er qui aboutit

UI:
- check_dns: si target trouve mais client KO, status=warn et liste des candidats tentes
- check_ssh: classification erreur (no route/timeout/refused/permission denied/etc.)
  + liste des candidats tentes + suggestion pour FQDN exception
This commit is contained in:
Pierre & Lumière 2026-05-07 19:31:14 +02:00
parent 1747447f82
commit 517b02f602
2 changed files with 158 additions and 76 deletions

View File

@ -24,7 +24,7 @@ import socket
import time
from typing import Callable, Dict, List, Any
from .realtime_audit_service import _resolve, _connect, PARAMIKO_OK
from .realtime_audit_service import _resolve, _connect, _candidate_targets, PARAMIKO_OK
log = logging.getLogger("patchcenter.prepatch")
@ -74,16 +74,30 @@ def _exec(client, cmd: str) -> Dict[str, Any]:
# ────────────────────────────────────────────────────────────────────────
def check_dns(ctx: Dict[str, Any]) -> Dict[str, Any]:
"""Résolution DNS du hostname (nom court → FQDN connu via base ou suffixes)."""
"""Résolution DNS du hostname (nom court → FQDN connu via base ou suffixes).
Status = ok si un candidat a SSH réussi (target défini ET client !== None),
warn si un FQDN candidat existe mais SSH a échoué partout,
ko si aucune résolution possible."""
hostname = ctx["hostname"]
target = ctx.get("target")
if target:
client_ok = ctx.get("client") is not None
tried = ctx.get("ssh_tried") or []
if target and client_ok:
return {
"name": "dns",
"label": "Résolution DNS",
"status": "ok",
"message": f"{hostname}{target}",
"details": "",
"details": (f"Candidats essayés : {', '.join(tried)}" if len(tried) > 1 else ""),
}
if target and tried:
return {
"name": "dns",
"label": "Résolution DNS",
"status": "warn",
"message": f"{hostname} : {len(tried)} FQDN tentés, SSH KO partout",
"details": "Candidats : " + ", ".join(tried),
}
# Si _resolve a échoué, on retente directement gethostbyname pour récupérer une IP
try:
@ -132,33 +146,36 @@ def check_ssh(ctx: Dict[str, Any]) -> Dict[str, Any]:
"details": "",
}
err = (ctx.get("ssh_error") or "").strip()
target = ctx.get("target") or "?"
# Classification du type d'erreur (pour message actionnable)
tried = ctx.get("ssh_tried") or []
err_low = err.lower()
# Classification du type d'erreur dominant
if "no route to host" in err_low or "network is unreachable" in err_low:
msg = f"Réseau injoignable ({target}) — vérifier routage/firewall"
msg = "Réseau injoignable — vérifier routage/firewall"
elif "connection timed out" in err_low or "timed out" in err_low:
msg = f"Timeout connexion vers {target} — port SSH 22 filtré ou hôte down"
msg = "Timeout connexion — port SSH 22 filtré ou hôte down"
elif "connection refused" in err_low:
msg = f"Port 22 refusé sur {target} — sshd arrêté ou bloqué"
elif "no matching" in err_low and ("kex" in err_low or "key exchange" in err_low or "host key" in err_low):
msg = f"Algos KEX incompatibles avec {target} — durcissement SSH"
elif "host key" in err_low or "hostkey" in err_low:
msg = f"Host key inconnue/changée pour {target} — known_hosts ?"
msg = "Port 22 refusé — sshd arrêté ou bloqué"
elif ("no matching" in err_low and ("kex" in err_low or "key exchange" in err_low)) \
or "host key" in err_low or "hostkey" in err_low:
msg = "Incompatibilité crypto SSH ou host key — algos KEX/host_keys"
elif "permission denied" in err_low or "authentication failed" in err_low:
msg = f"Authentification refusée par {target} — vérifier user/clé/password"
msg = "Authentification refusée — vérifier user/clé/password"
elif "no authentication methods" in err_low:
msg = f"Aucune méthode d'auth acceptée par {target}"
msg = "Aucune méthode d'auth acceptée"
elif "name or service not known" in err_low or "could not resolve" in err_low:
msg = f"DNS échoué côté SSH ({target})"
msg = "DNS échoué — aucun FQDN candidat ne résoud"
elif err:
msg = f"Échec SSH vers {target}"
msg = "Échec SSH sur tous les candidats DNS"
else:
msg = f"Échec connexion SSH vers {target} (raison inconnue)"
msg = "Échec connexion SSH (raison inconnue)"
if tried:
msg += f"{len(tried)} candidat(s) tenté(s) : {', '.join(tried)}"
details = (err or "Pas d'exception capturée") + \
"\nVérifier ssh_method/clé/PSMP/mot de passe dans Settings (section SSH)."
"\n\n→ Vérifier ssh_method/clé/PSMP/mot de passe dans Settings (section SSH)." + \
"\n→ Si l'hôte est sur un FQDN exception (ex: vr* sur .sanef.groupe), renseigner servers.fqdn explicitement."
return {
"name": "ssh",
"label": "Connexion SSH",
@ -341,24 +358,37 @@ def run_all_checks(hostname: str, row: Dict[str, Any] | None = None,
"""
t0 = time.time()
only_set = set(only) if only else None
target = _resolve(hostname)
candidates = _candidate_targets(hostname) if PARAMIKO_OK else []
client = None
ssh_error = None
ssh_method = None
if target and PARAMIKO_OK:
try:
client = _connect(target, hostname)
# _connect peut renvoyer un tuple (client, method) selon implem ; fallback :
if isinstance(client, tuple) and len(client) >= 1:
ssh_method = client[1] if len(client) > 1 else None
client = client[0]
except Exception as e:
log.warning(f"_connect raised on {hostname}: {e}")
ssh_error = f"{type(e).__name__}: {e}"
client = None
target = None
all_errors = [] # erreurs accumulées sur tous les candidats tentés
tried = [] # FQDNs essayés (pour debug)
if PARAMIKO_OK:
for cand in candidates:
tried.append(cand)
errs = []
try:
c = _connect(cand, hostname, errors=errs)
except Exception as e:
errs.append(f"{type(e).__name__}: {e}")
c = None
if c is not None:
client = c
target = cand
break
# Échec sur ce candidat, on accumule les raisons et passe au suivant
all_errors.append(f"{cand}: " + " | ".join(errs) if errs else f"{cand}: échec sans détail")
# Si aucun client mais on a au moins un candidat dont on a vérifié DNS, garde-le pour info
if target is None and tried:
target = tried[0] # juste pour affichage du 1er essayé
ssh_error = "\n".join(all_errors) if all_errors else None
ctx = {"hostname": hostname, "target": target, "client": client,
"row": row or {}, "ssh_error": ssh_error, "ssh_method": ssh_method}
"row": row or {}, "ssh_error": ssh_error,
"ssh_tried": tried}
results = []
for name, fn in CHECKS.items():
if only_set is not None and name not in only_set:

View File

@ -101,25 +101,46 @@ def _ordered_suffixes(hostname):
def _fqdn_is_consistent(hostname, fqdn):
"""Vérifie que le FQDN BDD respecte la convention SANEF par préfixe.
Évite d'utiliser un FQDN incohérent (ex: vrameased1.sanef.groupe alors
que vr* doit être .sanef-rec.fr)."""
if not fqdn or not hostname or len(hostname) < 2:
return True
fqdn_lc = fqdn.lower()
second = hostname[1].lower()
# vr/vt/vd/vv → .sanef-rec.fr obligatoire
if second in ("r", "t", "d", "v"):
return ".sanef-rec.fr" in fqdn_lc
# vp/sp/lp → .sanef.groupe ou .sanef.fr
if second in ("p", "l"):
return (".sanef.groupe" in fqdn_lc) or (".sanef.fr" in fqdn_lc)
# Préfixe inconnu : on accepte le FQDN tel quel
return True
"""Valide qu'un FQDN BDD est utilisable. Règle : le FQDN doit commencer
par le hostname suivi d'un point (sinon c'est probablement une donnée
cassée, ex: 'unknown.example.com' pour un host 'vpwhatever').
On NE VÉRIFIE PLUS la convention SANEF des préfixes (vr* .sanef-rec.fr)
parce qu'il y a des exceptions légitimes (ex: vrexpbtex1.sanef.groupe).
Si l'opérateur a explicitement renseigné un FQDN, on lui fait confiance.
La convention reste utilisée pour l'INFÉRENCE quand le FQDN BDD est vide
(cf _ordered_suffixes)."""
if not fqdn or not hostname:
return False
return fqdn.lower().startswith(hostname.lower() + ".")
def _resolve(hostname):
# 1. FQDN stocke en base — utilisé uniquement s'il respecte la convention SANEF
def _candidate_targets(hostname):
"""Retourne la liste des FQDN candidats à essayer SSH pour ce hostname,
dans l'ordre de priorité (logique SANEF) :
1. **Convention d'abord** : suffixe le plus probable selon le préfixe
(ex: vr*/lr*/sr* .sanef-rec.fr ; vp*/lp*/sp* .sanef.groupe).
2. **FQDN BDD** : si servers.fqdn est renseigné (et commence par hostname),
on l'ajoute en 2ᵉ position. Cas d'usage : serveurs exception qui ne
suivent pas la convention (ex: vrexpbtex1 sur .sanef.groupe).
3. **Autres suffixes** du référentiel DNS, dans l'ordre conventionnel.
Le caller doit essayer SSH sur chaque candidat dans l'ordre, et retourner
le 1er qui aboutit. Permettre le test SSH (et pas juste TCP port 22) évite
les faux positifs (ex: hostname homonyme qui répond port 22 sur la mauvaise
zone DNS)."""
out = []
seen = set()
suffixes = _ordered_suffixes(hostname)
# 1. Convention en premier (1er suffixe selon le préfixe)
if suffixes:
first = hostname + suffixes[0]
out.append(first); seen.add(first.lower())
# 2. FQDN BDD si renseigné et plausible (commence par hostname.)
try:
from ..database import SessionLocal
db = SessionLocal()
@ -128,18 +149,27 @@ def _resolve(hostname):
"AND fqdn IS NOT NULL AND fqdn != ''"
), {"h": hostname}).fetchone()
db.close()
if row and row.fqdn:
if _fqdn_is_consistent(hostname, row.fqdn):
return row.fqdn
log.warning(
f"FQDN BDD '{row.fqdn}' incohérent avec hostname '{hostname}' "
f"(convention SANEF) → fallback sur suffixes ordonnés"
)
if row and row.fqdn and _fqdn_is_consistent(hostname, row.fqdn):
f = row.fqdn.lower()
if f not in seen:
out.append(row.fqdn); seen.add(f)
except Exception:
pass
# 2. Fallback : boucle suffixes DNS (si FQDN manquant en base)
for suffix in _ordered_suffixes(hostname):
# 3. Autres suffixes du référentiel
for suffix in suffixes[1:]:
target = hostname + suffix
if target.lower() not in seen:
out.append(target); seen.add(target.lower())
return out
def _resolve(hostname):
"""Résolution best-effort : retourne le 1er candidat dont le port 22 répond TCP.
Pour un check SSH complet (auth incluse), passer plutôt par _candidate_targets +
boucle _connect côté caller."""
for target in _candidate_targets(hostname):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
@ -152,9 +182,12 @@ def _resolve(hostname):
return None
def _connect_via_psmp(target):
"""Connexion via PSMP CyberArk (auth_interactive avec Vault Password)."""
def _connect_via_psmp(target, errors=None):
"""Connexion via PSMP CyberArk (auth_interactive avec Vault Password).
Si `errors` (list) est fourni, les exceptions y sont append en clair."""
if not PARAMIKO_OK:
if errors is not None:
errors.append("paramiko non disponible")
return None
try:
from .secrets_service import get_secret
@ -167,17 +200,23 @@ def _connect_via_psmp(target):
password = get_secret(db, "ssh_pwd_default_pass") or ""
db.close()
if not password:
if errors is not None:
errors.append("PSMP: ssh_pwd_default_pass non configuré")
return None
username = f"{cyber_user}@{target_user}@{target}"
transport = paramiko.Transport((psmp_host, psmp_port))
transport.start_client(timeout=SSH_TIMEOUT)
transport.auth_interactive(username, lambda t, i, p: [password] * len(p))
if not transport.is_authenticated():
if errors is not None:
errors.append("PSMP: authentification refusée par le proxy")
return None
client = paramiko.SSHClient()
client._transport = transport
return client
except Exception:
except Exception as e:
if errors is not None:
errors.append(f"PSMP: {type(e).__name__}: {e}")
return None
@ -195,34 +234,42 @@ def _resolve_ssh_method(hostname):
return None
def _connect(target, hostname=None):
def _connect(target, hostname=None, errors=None):
"""Établit une session SSH. Si `errors` (list) est fourni, les exceptions
de chaque tentative (PSMP, clé, password) y sont append en clair pour aider
au diagnostic côté UI."""
if not PARAMIKO_OK:
if errors is not None:
errors.append("paramiko non disponible côté serveur PatchCenter")
return None
import os
# Routage PSMP si ssh_method='ssh_psmp' pour ce serveur
method = _resolve_ssh_method(hostname or target)
if method == "ssh_psmp":
client = _connect_via_psmp(target)
client = _connect_via_psmp(target, errors=errors)
if client:
return client
# fallback SSH direct si PSMP KO
ssh_key, ssh_user = _get_ssh_settings()
# 1. Essai clé SSH depuis settings (contenu PEM ou chemin legacy)
key_sources = []
if ssh_key and "BEGIN" in ssh_key and "PRIVATE KEY" in ssh_key:
from io import StringIO
key_sources = [("content", ssh_key)]
elif ssh_key and os.path.exists(ssh_key):
key_sources = [("file", ssh_key)]
elif ssh_key:
if errors is not None:
errors.append(f"Clé SSH configurée mais introuvable / format invalide ({ssh_key[:50]}...)")
if not key_sources and errors is not None:
errors.append("Aucune clé SSH utilisable depuis Settings (ssh_key_private_key vide ou invalide)")
for src_type, src in key_sources:
for loader_file, loader_str in [
(paramiko.Ed25519Key.from_private_key_file, paramiko.Ed25519Key.from_private_key),
(paramiko.RSAKey.from_private_key_file, paramiko.RSAKey.from_private_key),
(paramiko.ECDSAKey.from_private_key_file, paramiko.ECDSAKey.from_private_key),
for loader_name, loader_file, loader_str in [
("Ed25519", paramiko.Ed25519Key.from_private_key_file, paramiko.Ed25519Key.from_private_key),
("RSA", paramiko.RSAKey.from_private_key_file, paramiko.RSAKey.from_private_key),
("ECDSA", paramiko.ECDSAKey.from_private_key_file, paramiko.ECDSAKey.from_private_key),
]:
try:
from io import StringIO
@ -235,7 +282,9 @@ def _connect(target, hostname=None):
client.connect(target, port=22, username=ssh_user, pkey=key,
timeout=SSH_TIMEOUT, look_for_keys=False, allow_agent=False)
return client
except Exception:
except Exception as e:
if errors is not None:
errors.append(f"Clé {loader_name} ({ssh_user}@{target}:22): {type(e).__name__}: {e}")
continue
# 2. Fallback mot de passe depuis les settings
@ -252,8 +301,11 @@ def _connect(target, hostname=None):
client.connect(target, port=22, username=pwd_user, password=pwd_pass,
timeout=SSH_TIMEOUT, look_for_keys=False, allow_agent=False)
return client
except Exception:
pass
elif errors is not None:
errors.append("Pas de fallback password (ssh_pwd_default_pass non configuré)")
except Exception as e:
if errors is not None:
errors.append(f"Password ({pwd_user if 'pwd_user' in dir() else ssh_user}@{target}:22): {type(e).__name__}: {e}")
return None