"""Test manuel connexion PSMP CyberArk. Usage: python tools/test_psmp.py Example: python tools/test_psmp.py vposapapp1.sanef.groupe """ import os import sys import traceback from sqlalchemy import create_engine, text try: import paramiko except ImportError: print("[ERR] pip install paramiko") sys.exit(1) DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" def get_secret(conn, key): """Lit + dechiffre Fernet (meme derivation que app/services/secrets_service.py).""" import base64 from cryptography.fernet import Fernet secret_key = os.getenv("SECRET_KEY", "slpm-patchcenter-secret-key-2026-change-in-production") raw = secret_key.encode()[:32].ljust(32, b'\0') fernet = Fernet(base64.urlsafe_b64encode(raw)) row = conn.execute(text("SELECT value FROM app_secrets WHERE key=:k"), {"k": key}).fetchone() if not row or not row.value: return None try: return fernet.decrypt(row.value.encode()).decode() except Exception: return row.value def main(): if len(sys.argv) < 2: print("Usage: python tools/test_psmp.py ") sys.exit(1) fqdn = sys.argv[1] engine = create_engine(DATABASE_URL) conn = engine.connect() psmp_host = get_secret(conn, "psmp_host") or "psmp.sanef.fr" psmp_port = int(get_secret(conn, "psmp_port") or "22") cyber_user = get_secret(conn, "psmp_cyberark_user") or "CYBP01336" target_user = get_secret(conn, "psmp_target_user") or "cybsecope" password = get_secret(conn, "ssh_pwd_default_pass") conn.close() print(f"[INFO] PSMP: {psmp_host}:{psmp_port}") print(f"[INFO] Format user: {cyber_user}@{target_user}@{fqdn}") print(f"[INFO] Password configure: {'OUI' if password else 'NON'}") if password: print(f"[INFO] Longueur password: {len(password)} chars") print() if not password: print("[ERR] ssh_pwd_default_pass vide - configure-le sur /settings onglet SSH password") return # Step 1: socket TCP import socket print(f"[STEP 1] Test TCP {psmp_host}:{psmp_port}...") try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(5) s.connect((psmp_host, psmp_port)) s.close() print(" OK TCP reachable") except Exception as e: print(f" KO TCP: {e}") return # Step 2: paramiko transport username = f"{cyber_user}@{target_user}@{fqdn}" print(f"\n[STEP 2] paramiko Transport + auth_interactive username={username}...") try: transport = paramiko.Transport((psmp_host, psmp_port)) transport.start_client(timeout=15) def handler(title, instructions, prompt_list): print(f" [AUTH PROMPT] title={title!r} instructions={instructions!r}") print(f" [AUTH PROMPT] prompts={[p[0] for p in prompt_list]}") return [password] * len(prompt_list) transport.auth_interactive(username, handler) if transport.is_authenticated(): print(" OK Authentifie") else: print(" KO Pas authentifie") return except Exception as e: print(f" KO auth_interactive: {type(e).__name__}: {e}") traceback.print_exc() return # Step 3: commande test print("\n[STEP 3] Execute `hostname`...") try: channel = transport.open_session() channel.exec_command("hostname") out = channel.recv(4096).decode("utf-8", errors="replace") err = channel.recv_stderr(4096).decode("utf-8", errors="replace") print(f" STDOUT: {out.strip()}") if err: print(f" STDERR: {err.strip()}") channel.close() transport.close() print("\n[OK] Connexion PSMP reussie.") except Exception as e: print(f" KO exec: {e}") if __name__ == "__main__": main()