patchcenter/app/services/ldap_service.py
Admin MPCZ 8479d7280e Users/Contacts: workflow profils + LDAP + sync iTop + etat aligne
- Users: 4 profils (admin/coordinator/operator/viewer) remplacent la matrix
- /users/add: picker contacts iTop (plus de creation libre)
- /me/change-password: flow force_password_change
- LDAP: service + section settings + option login
- Sync iTop contacts: filtre par teams (SecOps/iPOP/Externe/DSI/Admin DSI)
- Auto-desactivation users si contact inactif
- etat: alignement sur enum iTop (production/implementation/stock/obsolete)
- Menu: Contacts dans Administration, Serveurs en groupe repliable
- Audit bases: demo/prod via JWT mode

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:50:43 +02:00

114 lines
4.1 KiB
Python

"""Service LDAP/AD — authentification via annuaire.
Configuration via settings (clés) :
- ldap_enabled : "true" / "false"
- ldap_server : URL du serveur (ex: ldaps://ad.sanef.com:636)
- ldap_base_dn : DN de base (ex: DC=sanef,DC=com)
- ldap_bind_dn : DN du compte de bind (ex: CN=svc_pc,OU=SVC,DC=sanef,DC=com)
- ldap_bind_pwd : mot de passe (stocké chiffré via secrets_service)
- ldap_user_filter : filtre utilisateur (ex: (sAMAccountName={username}))
- ldap_tls : "true" / "false"
"""
import logging
from sqlalchemy import text
log = logging.getLogger(__name__)
try:
from ldap3 import Server, Connection, ALL, NTLM, SIMPLE, Tls
import ssl
LDAP_OK = True
except ImportError:
LDAP_OK = False
def _get_config(db):
"""Retourne la config LDAP depuis app_secrets (via get_secret)."""
from .secrets_service import get_secret
def s(k, default=""):
return get_secret(db, k) or default
return {
"enabled": s("ldap_enabled", "false").lower() == "true",
"server": s("ldap_server"),
"base_dn": s("ldap_base_dn"),
"bind_dn": s("ldap_bind_dn"),
"bind_pwd": s("ldap_bind_pwd"),
"user_filter": s("ldap_user_filter", "(sAMAccountName={username})"),
"tls": s("ldap_tls", "true").lower() == "true",
"email_attr": s("ldap_email_attr", "mail"),
"name_attr": s("ldap_name_attr", "displayName"),
}
def is_enabled(db):
"""LDAP activé et configuré ?"""
cfg = _get_config(db)
return cfg["enabled"] and cfg["server"] and cfg["base_dn"]
def authenticate(db, username, password):
"""Tente l'authentification LDAP.
Retourne dict {ok, email, name, dn} ou {ok: False, msg: '...'}"""
if not LDAP_OK:
return {"ok": False, "msg": "Module ldap3 non installé"}
cfg = _get_config(db)
if not cfg["enabled"]:
return {"ok": False, "msg": "LDAP désactivé"}
if not cfg["server"] or not cfg["base_dn"]:
return {"ok": False, "msg": "LDAP non configuré"}
# 1. Bind service account pour chercher le DN de l'utilisateur
try:
server = Server(cfg["server"], get_info=ALL, use_ssl=cfg["server"].startswith("ldaps://"))
conn = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"], auto_bind=True)
except Exception as e:
log.error(f"LDAP bind failed: {e}")
return {"ok": False, "msg": f"Connexion LDAP échouée: {e}"}
# 2. Recherche de l'utilisateur
user_filter = cfg["user_filter"].replace("{username}", username)
try:
conn.search(cfg["base_dn"], user_filter,
attributes=[cfg["email_attr"], cfg["name_attr"], "distinguishedName"])
except Exception as e:
conn.unbind()
return {"ok": False, "msg": f"Recherche LDAP échouée: {e}"}
if not conn.entries:
conn.unbind()
return {"ok": False, "msg": "Utilisateur introuvable dans LDAP"}
entry = conn.entries[0]
user_dn = str(entry.distinguishedName) if hasattr(entry, "distinguishedName") else entry.entry_dn
email = str(getattr(entry, cfg["email_attr"], "")) or ""
name = str(getattr(entry, cfg["name_attr"], "")) or username
conn.unbind()
# 3. Bind avec les credentials fournis
try:
user_conn = Connection(server, user=user_dn, password=password, auto_bind=True)
user_conn.unbind()
except Exception as e:
return {"ok": False, "msg": "Mot de passe incorrect"}
return {"ok": True, "dn": user_dn, "email": email, "name": name}
def test_connection(db):
"""Test la connexion LDAP avec le compte de bind (pour admin)."""
if not LDAP_OK:
return {"ok": False, "msg": "Module ldap3 non installé"}
cfg = _get_config(db)
if not cfg["server"]:
return {"ok": False, "msg": "Serveur non configuré"}
try:
server = Server(cfg["server"], get_info=ALL, use_ssl=cfg["server"].startswith("ldaps://"))
conn = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"], auto_bind=True)
conn.unbind()
return {"ok": True, "msg": "Connexion réussie"}
except Exception as e:
return {"ok": False, "msg": str(e)[:200]}