patchcenter/tools/test_psmp.py

122 lines
3.9 KiB
Python

"""Test manuel connexion PSMP CyberArk.
Usage:
python tools/test_psmp.py <fqdn_cible>
Example:
python tools/test_psmp.py vposapapp1.sanef.groupe
"""
import os
import sys
import traceback
from sqlalchemy import create_engine, text
try:
import paramiko
except ImportError:
print("[ERR] pip install paramiko")
sys.exit(1)
DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo"
def get_secret(conn, key):
"""Lit + dechiffre Fernet (meme derivation que app/services/secrets_service.py)."""
import base64
from cryptography.fernet import Fernet
secret_key = os.getenv("SECRET_KEY",
"slpm-patchcenter-secret-key-2026-change-in-production")
raw = secret_key.encode()[:32].ljust(32, b'\0')
fernet = Fernet(base64.urlsafe_b64encode(raw))
row = conn.execute(text("SELECT value FROM app_secrets WHERE key=:k"), {"k": key}).fetchone()
if not row or not row.value:
return None
try:
return fernet.decrypt(row.value.encode()).decode()
except Exception:
return row.value
def main():
if len(sys.argv) < 2:
print("Usage: python tools/test_psmp.py <fqdn_cible>")
sys.exit(1)
fqdn = sys.argv[1]
engine = create_engine(DATABASE_URL)
conn = engine.connect()
psmp_host = get_secret(conn, "psmp_host") or "psmp.sanef.fr"
psmp_port = int(get_secret(conn, "psmp_port") or "22")
cyber_user = get_secret(conn, "psmp_cyberark_user") or "CYBP01336"
target_user = get_secret(conn, "psmp_target_user") or "cybsecope"
password = get_secret(conn, "ssh_pwd_default_pass")
conn.close()
print(f"[INFO] PSMP: {psmp_host}:{psmp_port}")
print(f"[INFO] Format user: {cyber_user}@{target_user}@{fqdn}")
print(f"[INFO] Password configure: {'OUI' if password else 'NON'}")
if password:
print(f"[INFO] Longueur password: {len(password)} chars")
print()
if not password:
print("[ERR] ssh_pwd_default_pass vide - configure-le sur /settings onglet SSH password")
return
# Step 1: socket TCP
import socket
print(f"[STEP 1] Test TCP {psmp_host}:{psmp_port}...")
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(5)
s.connect((psmp_host, psmp_port))
s.close()
print(" OK TCP reachable")
except Exception as e:
print(f" KO TCP: {e}")
return
# Step 2: paramiko transport
username = f"{cyber_user}@{target_user}@{fqdn}"
print(f"\n[STEP 2] paramiko Transport + auth_interactive username={username}...")
try:
transport = paramiko.Transport((psmp_host, psmp_port))
transport.start_client(timeout=15)
def handler(title, instructions, prompt_list):
print(f" [AUTH PROMPT] title={title!r} instructions={instructions!r}")
print(f" [AUTH PROMPT] prompts={[p[0] for p in prompt_list]}")
return [password] * len(prompt_list)
transport.auth_interactive(username, handler)
if transport.is_authenticated():
print(" OK Authentifie")
else:
print(" KO Pas authentifie")
return
except Exception as e:
print(f" KO auth_interactive: {type(e).__name__}: {e}")
traceback.print_exc()
return
# Step 3: commande test
print("\n[STEP 3] Execute `hostname`...")
try:
channel = transport.open_session()
channel.exec_command("hostname")
out = channel.recv(4096).decode("utf-8", errors="replace")
err = channel.recv_stderr(4096).decode("utf-8", errors="replace")
print(f" STDOUT: {out.strip()}")
if err:
print(f" STDERR: {err.strip()}")
channel.close()
transport.close()
print("\n[OK] Connexion PSMP reussie.")
except Exception as e:
print(f" KO exec: {e}")
if __name__ == "__main__":
main()