Add test_ldap: diagnostic LDAP/AD step-by-step (bind admin + search user + bind user)
This commit is contained in:
parent
bfc996e50e
commit
d72d4a711f
135
tools/test_ldap.py
Normal file
135
tools/test_ldap.py
Normal file
@ -0,0 +1,135 @@
|
||||
"""Test LDAP/AD step-by-step (bind admin + search user + bind user).
|
||||
|
||||
Lit la config depuis app_secrets (PatchCenter). Optionnel : --user/--pass
|
||||
pour tester un compte utilisateur après le bind admin.
|
||||
|
||||
Usage:
|
||||
python tools/test_ldap.py # test bind admin uniquement
|
||||
python tools/test_ldap.py --user moutaouakil-ext # test bind admin + recherche user
|
||||
python tools/test_ldap.py --user xx --pass yy # + bind user (verifie mdp)
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import base64
|
||||
from sqlalchemy import create_engine, text
|
||||
|
||||
DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \
|
||||
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo"
|
||||
|
||||
try:
|
||||
from ldap3 import Server, Connection, ALL
|
||||
from cryptography.fernet import Fernet
|
||||
except ImportError:
|
||||
print("[ERR] pip install ldap3 cryptography")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def get_secret(conn, key):
|
||||
secret = os.getenv("SECRET_KEY",
|
||||
"slpm-patchcenter-secret-key-2026-change-in-production")
|
||||
raw = secret.encode()[:32].ljust(32, b"\0")
|
||||
f = Fernet(base64.urlsafe_b64encode(raw))
|
||||
row = conn.execute(text("SELECT value FROM app_secrets WHERE key=:k"), {"k": key}).fetchone()
|
||||
if not row or not row.value:
|
||||
return None
|
||||
try:
|
||||
return f.decrypt(row.value.encode()).decode()
|
||||
except Exception:
|
||||
return row.value
|
||||
|
||||
|
||||
def main():
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument("--user", help="Username AD a tester (sAMAccountName)")
|
||||
p.add_argument("--pass", dest="pwd", help="Mot de passe user a tester")
|
||||
args = p.parse_args()
|
||||
|
||||
engine = create_engine(DATABASE_URL)
|
||||
conn = engine.connect()
|
||||
cfg = {
|
||||
"server": get_secret(conn, "ldap_server"),
|
||||
"base_dn": get_secret(conn, "ldap_base_dn"),
|
||||
"bind_dn": get_secret(conn, "ldap_bind_dn"),
|
||||
"bind_pwd": get_secret(conn, "ldap_bind_pwd"),
|
||||
"user_filter": get_secret(conn, "ldap_user_filter") or "(sAMAccountName={username})",
|
||||
"email_attr": get_secret(conn, "ldap_email_attr") or "mail",
|
||||
"name_attr": get_secret(conn, "ldap_name_attr") or "displayName",
|
||||
}
|
||||
conn.close()
|
||||
|
||||
print(f"[INFO] Server : {cfg['server']}")
|
||||
print(f"[INFO] Base DN : {cfg['base_dn']}")
|
||||
print(f"[INFO] Bind DN : {cfg['bind_dn']}")
|
||||
print(f"[INFO] Bind pwd: {'***configure***' if cfg['bind_pwd'] else 'VIDE'}")
|
||||
print(f"[INFO] User filter: {cfg['user_filter']}")
|
||||
print()
|
||||
|
||||
if not cfg["server"] or not cfg["bind_dn"] or not cfg["bind_pwd"]:
|
||||
print("[ERR] Config incomplete dans app_secrets. Configure /settings onglet LDAP d'abord.")
|
||||
return
|
||||
|
||||
use_ssl = cfg["server"].startswith("ldaps://")
|
||||
server = Server(cfg["server"], get_info=ALL, use_ssl=use_ssl)
|
||||
|
||||
# Step 1: bind admin
|
||||
print("[STEP 1] Bind compte admin...")
|
||||
try:
|
||||
c = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"], auto_bind=True)
|
||||
print(f" OK Bind admin reussi. Server info: {server.info.naming_contexts if server.info else 'N/A'}")
|
||||
except Exception as e:
|
||||
print(f" KO Bind admin echec: {type(e).__name__}: {e}")
|
||||
return
|
||||
|
||||
if not args.user:
|
||||
c.unbind()
|
||||
print("\n[OK] Bind admin OK. Lance avec --user <login> pour tester la recherche.")
|
||||
return
|
||||
|
||||
# Step 2: search user
|
||||
print(f"\n[STEP 2] Recherche user {args.user}...")
|
||||
user_filter = cfg["user_filter"].replace("{username}", args.user)
|
||||
try:
|
||||
c.search(cfg["base_dn"], user_filter,
|
||||
attributes=[cfg["email_attr"], cfg["name_attr"], "distinguishedName", "memberOf"])
|
||||
except Exception as e:
|
||||
c.unbind()
|
||||
print(f" KO Search echec: {e}")
|
||||
return
|
||||
|
||||
if not c.entries:
|
||||
c.unbind()
|
||||
print(f" KO User '{args.user}' introuvable (filtre: {user_filter})")
|
||||
return
|
||||
|
||||
entry = c.entries[0]
|
||||
user_dn = entry.entry_dn
|
||||
email = str(getattr(entry, cfg["email_attr"], ""))
|
||||
name = str(getattr(entry, cfg["name_attr"], ""))
|
||||
print(f" OK Trouve:")
|
||||
print(f" DN : {user_dn}")
|
||||
print(f" Email : {email}")
|
||||
print(f" Name : {name}")
|
||||
if hasattr(entry, "memberOf"):
|
||||
groups = list(entry.memberOf.values) if entry.memberOf else []
|
||||
print(f" Groups: {len(groups)} appartenances")
|
||||
for g in groups[:5]:
|
||||
print(f" - {g}")
|
||||
c.unbind()
|
||||
|
||||
if not args.pwd:
|
||||
print("\n[OK] Recherche OK. Lance avec --pass pour tester l'auth.")
|
||||
return
|
||||
|
||||
# Step 3: bind user
|
||||
print(f"\n[STEP 3] Bind user {args.user} avec mdp fourni...")
|
||||
try:
|
||||
uc = Connection(server, user=user_dn, password=args.pwd, auto_bind=True)
|
||||
uc.unbind()
|
||||
print(" OK Authentification reussie.")
|
||||
except Exception as e:
|
||||
print(f" KO Auth echec: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in New Issue
Block a user