diff --git a/tools/test_psmp.py b/tools/test_psmp.py new file mode 100644 index 0000000..658df5e --- /dev/null +++ b/tools/test_psmp.py @@ -0,0 +1,109 @@ +"""Test manuel connexion PSMP CyberArk. + +Usage: + python tools/test_psmp.py +Example: + python tools/test_psmp.py vposapapp1.sanef.groupe +""" +import os +import sys +import traceback +from sqlalchemy import create_engine, text + +try: + import paramiko +except ImportError: + print("[ERR] pip install paramiko") + sys.exit(1) + +DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ + or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" + + +def get_secret(conn, key): + row = conn.execute(text("SELECT value FROM app_secrets WHERE key=:k"), {"k": key}).fetchone() + return row.value if row else None + + +def main(): + if len(sys.argv) < 2: + print("Usage: python tools/test_psmp.py ") + sys.exit(1) + fqdn = sys.argv[1] + + engine = create_engine(DATABASE_URL) + conn = engine.connect() + + psmp_host = get_secret(conn, "psmp_host") or "psmp.sanef.fr" + psmp_port = int(get_secret(conn, "psmp_port") or "22") + cyber_user = get_secret(conn, "psmp_cyberark_user") or "CYBP01336" + target_user = get_secret(conn, "psmp_target_user") or "cybsecope" + password = get_secret(conn, "ssh_pwd_default_pass") + conn.close() + + print(f"[INFO] PSMP: {psmp_host}:{psmp_port}") + print(f"[INFO] Format user: {cyber_user}@{target_user}@{fqdn}") + print(f"[INFO] Password configure: {'OUI' if password else 'NON'}") + if password: + print(f"[INFO] Longueur password: {len(password)} chars") + print() + + if not password: + print("[ERR] ssh_pwd_default_pass vide - configure-le sur /settings onglet SSH password") + return + + # Step 1: socket TCP + import socket + print(f"[STEP 1] Test TCP {psmp_host}:{psmp_port}...") + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(5) + s.connect((psmp_host, psmp_port)) + s.close() + print(" OK TCP reachable") + except Exception as e: + print(f" KO TCP: {e}") + return + + # Step 2: paramiko transport + username = f"{cyber_user}@{target_user}@{fqdn}" + print(f"\n[STEP 2] paramiko Transport + auth_interactive username={username}...") + try: + transport = paramiko.Transport((psmp_host, psmp_port)) + transport.start_client(timeout=15) + + def handler(title, instructions, prompt_list): + print(f" [AUTH PROMPT] title={title!r} instructions={instructions!r}") + print(f" [AUTH PROMPT] prompts={[p[0] for p in prompt_list]}") + return [password] * len(prompt_list) + + transport.auth_interactive(username, handler) + if transport.is_authenticated(): + print(" OK Authentifie") + else: + print(" KO Pas authentifie") + return + except Exception as e: + print(f" KO auth_interactive: {type(e).__name__}: {e}") + traceback.print_exc() + return + + # Step 3: commande test + print("\n[STEP 3] Execute `hostname`...") + try: + channel = transport.open_session() + channel.exec_command("hostname") + out = channel.recv(4096).decode("utf-8", errors="replace") + err = channel.recv_stderr(4096).decode("utf-8", errors="replace") + print(f" STDOUT: {out.strip()}") + if err: + print(f" STDERR: {err.strip()}") + channel.close() + transport.close() + print("\n[OK] Connexion PSMP reussie.") + except Exception as e: + print(f" KO exec: {e}") + + +if __name__ == "__main__": + main()