"""Service prereq QuickWin — resolution DNS, SSH, disque, satellite Adapte au contexte SANEF : PSMP CyberArk + SSH key selon methode serveur""" import socket import logging import os log = logging.getLogger("quickwin.prereq") try: import paramiko PARAMIKO_OK = True except ImportError: PARAMIKO_OK = False log.warning("paramiko non disponible — checks SSH impossibles") # --- Constantes --- DOMP = "sanef.groupe" # domaine prod/preprod/dev DOMR = "sanef-rec.fr" # domaine recette/test PSMP_HOST = "psmp.sanef.fr" CYBR_USER = "CYBP01336" TARGET_USER = "cybsecope" SSH_KEY_FILE_DEFAULT = "/opt/patchcenter/keys/id_rsa_cybglobal.pem" SSH_TIMEOUT = 15 # Seuils disque (% utilise) DISK_MAX_PCT = 90 # >90% = KO # Banniere CyberArk a filtrer BANNER_FILTERS = [ "GROUPE SANEF", "propriete du Groupe", "accederait", "emprisonnement", "Article 323", "code penal", "Authorized uses only", "CyberArk", "This session", "session is being", ] def _get_settings(db): """Charge les settings utiles depuis la table settings""" from sqlalchemy import text rows = db.execute(text( "SELECT key, value FROM settings WHERE key IN " "('psmp_host','default_ssh_timeout','disk_min_root_mb')" )).fetchall() return {r.key: r.value for r in rows} def _get_secret(db, key): """Recupere un secret dechiffre depuis app_secrets""" try: from ..services.secrets_service import get_secret return get_secret(db, key) except Exception: return None # ========================================================= # 1. RESOLUTION DNS # ========================================================= def _detect_env(hostname): """Detecte l'environnement par la 2e lettre du hostname (convention SANEF) p=prod, i=preprod, r=recette, v/t=test, d=dev""" if len(hostname) < 2: return "unknown" c = hostname[1].lower() if c == "p": return "prod" elif c == "i": return "preprod" elif c == "r": return "recette" elif c in ("v", "t"): return "test" elif c == "d": return "dev" return "unknown" def _resolve_fqdn(hostname, domain_ltd=None, env_code=None): """Resout le hostname en FQDN testable. Retourne (fqdn, None) ou (None, error_msg). Logique: - Prod/Preprod/Dev: domp d'abord, puis domr - Recette/Test: domr d'abord, puis domp - Utilise domain_ltd si dispo, sinon detection par hostname """ if "." in hostname: # Deja un FQDN if _dns_resolves(hostname): return hostname, None return None, f"FQDN {hostname} non resolvable" # Determiner l'ordre des domaines env = _detect_env(hostname) if env_code: ec = env_code.upper() if ec in ("PRD", "PPR", "DEV"): env = "prod" elif ec in ("REC",): env = "recette" elif ec in ("TES", "TS1", "TS2"): env = "test" if env in ("prod", "preprod", "dev"): domains_order = [DOMP, DOMR] elif env in ("recette", "test"): domains_order = [DOMR, DOMP] else: # Fallback: utiliser domain_ltd si dispo if domain_ltd and domain_ltd.strip(): alt = DOMR if domain_ltd.strip() == DOMP else DOMP domains_order = [domain_ltd.strip(), alt] else: domains_order = [DOMP, DOMR] # Tenter resolution dans l'ordre for dom in domains_order: fqdn = f"{hostname}.{dom}" if _dns_resolves(fqdn): return fqdn, None return None, f"DNS KO: {hostname} non resolu ({'/'.join(domains_order)})" def _dns_resolves(fqdn): """Verifie si un FQDN se resout en IP""" try: socket.getaddrinfo(fqdn, 22, socket.AF_INET, socket.SOCK_STREAM) return True except (socket.gaierror, socket.herror, OSError): return False # ========================================================= # 2. TEST SSH # ========================================================= def _get_ssh_key_path(db=None): """Retourne le chemin de la cle SSH. Cherche d'abord dans app_secrets (ssh_key_file), puis fallback sur le chemin par defaut.""" if db: secret_path = _get_secret(db, "ssh_key_file") if secret_path and secret_path.strip() and os.path.exists(secret_path.strip()): return secret_path.strip() if os.path.exists(SSH_KEY_FILE_DEFAULT): return SSH_KEY_FILE_DEFAULT return None def _load_ssh_key(db=None): """Charge la cle SSH privee depuis le chemin configure en base ou par defaut""" key_path = _get_ssh_key_path(db) if not key_path: return None for cls in [paramiko.RSAKey, paramiko.Ed25519Key, paramiko.ECDSAKey]: try: return cls.from_private_key_file(key_path) except Exception: continue return None def _ssh_via_psmp(fqdn, password): """Connexion SSH via PSMP CyberArk (interactive auth). Username format: CYBP01336@cybsecope@fqdn""" if not password: return None, "PSMP: pas de mot de passe configure" try: username = f"{CYBR_USER}@{TARGET_USER}@{fqdn}" transport = paramiko.Transport((PSMP_HOST, 22)) transport.connect() def handler(title, instructions, prompt_list): return [password] * len(prompt_list) transport.auth_interactive(username, handler) client = paramiko.SSHClient() client._transport = transport return client, None except Exception as e: return None, f"PSMP: {str(e)[:120]}" def _ssh_via_key(fqdn, ssh_user, key): """Connexion SSH directe par cle""" if not key: return None, "SSH key: cle non trouvee" if not ssh_user: return None, "SSH key: user non configure" try: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(fqdn, port=22, username=ssh_user, pkey=key, timeout=SSH_TIMEOUT, look_for_keys=False, allow_agent=False) return client, None except Exception as e: return None, f"SSH key: {str(e)[:120]}" def _ssh_connect(fqdn, ssh_method, db): """Connecte au serveur selon la methode (ssh_psmp ou ssh_key). Retourne (client, error_msg)""" if not PARAMIKO_OK: return None, "paramiko non installe" if ssh_method == "ssh_psmp": # PSMP: password depuis app_secrets password = _get_secret(db, "ssh_pwd_default_pass") client, err = _ssh_via_psmp(fqdn, password) if client: return client, None # Fallback: tenter par cle key = _load_ssh_key(db) ssh_user = _get_secret(db, "ssh_pwd_default_user") or TARGET_USER client2, err2 = _ssh_via_key(fqdn, ssh_user, key) if client2: return client2, None return None, err # retourner l'erreur PSMP originale else: # ssh_key: user depuis secrets, cle depuis fichier ssh_user = _get_secret(db, "ssh_pwd_default_user") or TARGET_USER key = _load_ssh_key(db) client, err = _ssh_via_key(fqdn, ssh_user, key) if client: return client, None # Fallback: tenter via PSMP password = _get_secret(db, "ssh_pwd_default_pass") if password: client2, err2 = _ssh_via_psmp(fqdn, password) if client2: return client2, None return None, err def _ssh_exec(client, cmd, timeout=12): """Execute une commande via SSH et retourne (stdout, stderr, returncode). Filtre les bannieres CyberArk.""" try: chan = client._transport.open_session() chan.settimeout(timeout) chan.exec_command(cmd) out = b"" err = b"" while True: try: chunk = chan.recv(8192) if not chunk: break out += chunk except Exception: break try: err = chan.recv_stderr(8192) except Exception: pass rc = chan.recv_exit_status() chan.close() # Filtrer bannieres out_str = out.decode("utf-8", errors="replace") lines = [l for l in out_str.splitlines() if not any(b in l for b in BANNER_FILTERS)] return "\n".join(lines), err.decode("utf-8", errors="replace"), rc except Exception as e: return "", str(e), -1 # ========================================================= # 3. TEST ESPACE DISQUE # ========================================================= def _check_disk(client): """Verifie l'espace disque / et /var via sudo df. Retourne (ok, detail_msg)""" out, err, rc = _ssh_exec(client, "sudo df / /var --output=target,pcent 2>/dev/null | tail -n +2") if rc != 0 or not out.strip(): return True, "Disque: non verifie (df echoue)" ok = True parts = [] for line in out.strip().splitlines(): tokens = line.split() if len(tokens) >= 2: mount = tokens[0] pct_str = tokens[-1].replace("%", "").strip() if pct_str.isdigit(): pct = int(pct_str) if pct >= DISK_MAX_PCT: ok = False parts.append(f"{mount}={pct}% KO") else: parts.append(f"{mount}={pct}%") if not parts: return True, "Disque: non verifie" return ok, "Disque: " + ", ".join(parts) # ========================================================= # 4. TEST SATELLITE / YUM REPOS # ========================================================= def _check_satellite(client): """Verifie l'enregistrement Satellite et les repos YUM. Retourne (ok, detail_msg)""" # Tenter subscription-manager d'abord out, err, rc = _ssh_exec(client, "sudo subscription-manager status 2>/dev/null | head -5") if rc == 0 and "Current" in out: return True, "Satellite: OK (subscription-manager)" # Fallback: yum repolist out2, err2, rc2 = _ssh_exec(client, "sudo yum repolist 2>/dev/null | tail -1") if rc2 == 0 and out2.strip(): line = out2.strip() # Si "repolist: 0" => pas de repos if "repolist: 0" in line.lower(): return False, "Satellite: KO (0 repos)" return True, f"Satellite: OK ({line[:60]})" return False, "Satellite: KO (pas de repos ni subscription)" # ========================================================= # ORCHESTRATEUR PRINCIPAL # ========================================================= def check_server_prereqs(hostname, db, domain_ltd=None, env_code=None, ssh_method="ssh_key"): """Verification complete des prerequis d'un serveur QuickWin. Etapes: 1. Resolution DNS (domp/domr selon env) 2. Test SSH (PSMP ou key selon ssh_method) 3. Espace disque (sudo df) 4. Satellite/YUM Retourne dict: dns_ok, ssh_ok, disk_ok, satellite_ok, fqdn, detail, skip (True si serveur a ignorer) """ result = { "dns_ok": False, "ssh_ok": False, "disk_ok": False, "satellite_ok": False, "fqdn": "", "detail": "", "skip": False, } detail_parts = [] # 1. Resolution DNS fqdn, dns_err = _resolve_fqdn(hostname, domain_ltd, env_code) if not fqdn: result["detail"] = dns_err result["skip"] = True log.warning(f"[{hostname}] {dns_err}") return result result["dns_ok"] = True result["fqdn"] = fqdn detail_parts.append(f"DNS: OK ({fqdn})") # 2. Test SSH client, ssh_err = _ssh_connect(fqdn, ssh_method, db) if not client: detail_parts.append(f"SSH: KO ({ssh_err})") result["detail"] = " | ".join(detail_parts) result["skip"] = True log.warning(f"[{hostname}] SSH KO: {ssh_err}") return result result["ssh_ok"] = True method_label = "PSMP" if ssh_method == "ssh_psmp" else "key" detail_parts.append(f"SSH: OK ({method_label})") try: # 3. Espace disque disk_ok, disk_detail = _check_disk(client) result["disk_ok"] = disk_ok detail_parts.append(disk_detail) # 4. Satellite sat_ok, sat_detail = _check_satellite(client) result["satellite_ok"] = sat_ok detail_parts.append(sat_detail) finally: try: client.close() except Exception: pass result["detail"] = " | ".join(detail_parts) return result