"""Service LDAP/AD — authentification via annuaire. Configuration via settings (clés) : - ldap_enabled : "true" / "false" - ldap_server : URL du serveur (ex: ldaps://ad.sanef.com:636) - ldap_base_dn : DN de base (ex: DC=sanef,DC=com) - ldap_bind_dn : DN du compte de bind (ex: CN=svc_pc,OU=SVC,DC=sanef,DC=com) - ldap_bind_pwd : mot de passe (stocké chiffré via secrets_service) - ldap_user_filter : filtre utilisateur (ex: (sAMAccountName={username})) - ldap_tls : "true" / "false" """ import logging from sqlalchemy import text log = logging.getLogger(__name__) try: from ldap3 import Server, Connection, ALL, NTLM, SIMPLE, Tls import ssl LDAP_OK = True except ImportError: LDAP_OK = False def _get_config(db): """Retourne la config LDAP depuis app_secrets (via get_secret).""" from .secrets_service import get_secret def s(k, default=""): return get_secret(db, k) or default return { "enabled": s("ldap_enabled", "false").lower() == "true", "server": s("ldap_server"), "base_dn": s("ldap_base_dn"), "bind_dn": s("ldap_bind_dn"), "bind_pwd": s("ldap_bind_pwd"), "user_filter": s("ldap_user_filter", "(sAMAccountName={username})"), "tls": s("ldap_tls", "true").lower() == "true", "email_attr": s("ldap_email_attr", "mail"), "name_attr": s("ldap_name_attr", "displayName"), } def is_enabled(db): """LDAP activé et configuré ?""" cfg = _get_config(db) return cfg["enabled"] and cfg["server"] and cfg["base_dn"] def authenticate(db, username, password): """Tente l'authentification LDAP. Retourne dict {ok, email, name, dn} ou {ok: False, msg: '...'}""" if not LDAP_OK: return {"ok": False, "msg": "Module ldap3 non installé"} cfg = _get_config(db) if not cfg["enabled"]: return {"ok": False, "msg": "LDAP désactivé"} if not cfg["server"] or not cfg["base_dn"]: return {"ok": False, "msg": "LDAP non configuré"} # 1. Bind service account pour chercher le DN de l'utilisateur try: server = Server(cfg["server"], get_info=ALL, use_ssl=cfg["server"].startswith("ldaps://")) conn = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"], auto_bind=True) except Exception as e: log.error(f"LDAP bind failed: {e}") return {"ok": False, "msg": f"Connexion LDAP échouée: {e}"} # 2. Recherche de l'utilisateur user_filter = cfg["user_filter"].replace("{username}", username) try: conn.search(cfg["base_dn"], user_filter, attributes=[cfg["email_attr"], cfg["name_attr"], "distinguishedName"]) except Exception as e: conn.unbind() return {"ok": False, "msg": f"Recherche LDAP échouée: {e}"} if not conn.entries: conn.unbind() return {"ok": False, "msg": "Utilisateur introuvable dans LDAP"} entry = conn.entries[0] user_dn = str(entry.distinguishedName) if hasattr(entry, "distinguishedName") else entry.entry_dn email = str(getattr(entry, cfg["email_attr"], "")) or "" name = str(getattr(entry, cfg["name_attr"], "")) or username conn.unbind() # 3. Bind avec les credentials fournis try: user_conn = Connection(server, user=user_dn, password=password, auto_bind=True) user_conn.unbind() except Exception as e: return {"ok": False, "msg": "Mot de passe incorrect"} return {"ok": True, "dn": user_dn, "email": email, "name": name} def test_connection(db): """Test la connexion LDAP avec le compte de bind (pour admin).""" if not LDAP_OK: return {"ok": False, "msg": "Module ldap3 non installé"} cfg = _get_config(db) if not cfg["server"]: return {"ok": False, "msg": "Serveur non configuré"} try: server = Server(cfg["server"], get_info=ALL, use_ssl=cfg["server"].startswith("ldaps://")) conn = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"], auto_bind=True) conn.unbind() return {"ok": True, "msg": "Connexion réussie"} except Exception as e: return {"ok": False, "msg": str(e)[:200]}