118 lines
3.7 KiB
Python
118 lines
3.7 KiB
Python
"""Test manuel connexion PSMP CyberArk.
|
|
|
|
Usage:
|
|
python tools/test_psmp.py <fqdn_cible>
|
|
Example:
|
|
python tools/test_psmp.py vposapapp1.sanef.groupe
|
|
"""
|
|
import os
|
|
import sys
|
|
import traceback
|
|
from sqlalchemy import create_engine, text
|
|
|
|
try:
|
|
import paramiko
|
|
except ImportError:
|
|
print("[ERR] pip install paramiko")
|
|
sys.exit(1)
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \
|
|
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo"
|
|
|
|
|
|
def get_secret(conn, key):
|
|
"""Lit + dechiffre via le service officiel (Fernet)."""
|
|
from app.services.secrets_service import decrypt
|
|
row = conn.execute(text("SELECT value FROM app_secrets WHERE key=:k"), {"k": key}).fetchone()
|
|
if not row or not row.value:
|
|
return None
|
|
try:
|
|
return decrypt(row.value)
|
|
except Exception:
|
|
# Fallback si stocke en clair
|
|
return row.value
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python tools/test_psmp.py <fqdn_cible>")
|
|
sys.exit(1)
|
|
fqdn = sys.argv[1]
|
|
|
|
engine = create_engine(DATABASE_URL)
|
|
conn = engine.connect()
|
|
|
|
psmp_host = get_secret(conn, "psmp_host") or "psmp.sanef.fr"
|
|
psmp_port = int(get_secret(conn, "psmp_port") or "22")
|
|
cyber_user = get_secret(conn, "psmp_cyberark_user") or "CYBP01336"
|
|
target_user = get_secret(conn, "psmp_target_user") or "cybsecope"
|
|
password = get_secret(conn, "ssh_pwd_default_pass")
|
|
conn.close()
|
|
|
|
print(f"[INFO] PSMP: {psmp_host}:{psmp_port}")
|
|
print(f"[INFO] Format user: {cyber_user}@{target_user}@{fqdn}")
|
|
print(f"[INFO] Password configure: {'OUI' if password else 'NON'}")
|
|
if password:
|
|
print(f"[INFO] Longueur password: {len(password)} chars")
|
|
print()
|
|
|
|
if not password:
|
|
print("[ERR] ssh_pwd_default_pass vide - configure-le sur /settings onglet SSH password")
|
|
return
|
|
|
|
# Step 1: socket TCP
|
|
import socket
|
|
print(f"[STEP 1] Test TCP {psmp_host}:{psmp_port}...")
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.settimeout(5)
|
|
s.connect((psmp_host, psmp_port))
|
|
s.close()
|
|
print(" OK TCP reachable")
|
|
except Exception as e:
|
|
print(f" KO TCP: {e}")
|
|
return
|
|
|
|
# Step 2: paramiko transport
|
|
username = f"{cyber_user}@{target_user}@{fqdn}"
|
|
print(f"\n[STEP 2] paramiko Transport + auth_interactive username={username}...")
|
|
try:
|
|
transport = paramiko.Transport((psmp_host, psmp_port))
|
|
transport.start_client(timeout=15)
|
|
|
|
def handler(title, instructions, prompt_list):
|
|
print(f" [AUTH PROMPT] title={title!r} instructions={instructions!r}")
|
|
print(f" [AUTH PROMPT] prompts={[p[0] for p in prompt_list]}")
|
|
return [password] * len(prompt_list)
|
|
|
|
transport.auth_interactive(username, handler)
|
|
if transport.is_authenticated():
|
|
print(" OK Authentifie")
|
|
else:
|
|
print(" KO Pas authentifie")
|
|
return
|
|
except Exception as e:
|
|
print(f" KO auth_interactive: {type(e).__name__}: {e}")
|
|
traceback.print_exc()
|
|
return
|
|
|
|
# Step 3: commande test
|
|
print("\n[STEP 3] Execute `hostname`...")
|
|
try:
|
|
channel = transport.open_session()
|
|
channel.exec_command("hostname")
|
|
out = channel.recv(4096).decode("utf-8", errors="replace")
|
|
err = channel.recv_stderr(4096).decode("utf-8", errors="replace")
|
|
print(f" STDOUT: {out.strip()}")
|
|
if err:
|
|
print(f" STDERR: {err.strip()}")
|
|
channel.close()
|
|
transport.close()
|
|
print("\n[OK] Connexion PSMP reussie.")
|
|
except Exception as e:
|
|
print(f" KO exec: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|