"""Service LDAP/AD — authentification via annuaire. Configuration via settings (clés) : - ldap_enabled : "true" / "false" - ldap_server : URL du serveur (ex: ldaps://ad.sanef.com:636) - ldap_base_dn : DN de base (ex: DC=sanef,DC=com) - ldap_bind_dn : DN du compte de bind (ex: CN=svc_pc,OU=SVC,DC=sanef,DC=com) - ldap_bind_pwd : mot de passe (stocké chiffré via secrets_service) - ldap_user_filter : filtre utilisateur (ex: (sAMAccountName={username})) - ldap_tls : "true" / "false" Debug : journalise le détail des connexions LDAPS dans logs/ldap_debug.log (niveau DEBUG). Les mots de passe ne sont JAMAIS écrits dans les logs. """ import os import logging from logging.handlers import RotatingFileHandler from sqlalchemy import text # --- Logger dédié -> logs/ldap_debug.log (DEBUG) ------------------------------- # Chemin : .../app/services/ldap_service.py -> remonte de 3 niveaux = racine projet _LOG_DIR = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "logs", ) log = logging.getLogger("patchcenter.ldap") if not log.handlers: try: os.makedirs(_LOG_DIR, exist_ok=True) _handler = RotatingFileHandler( os.path.join(_LOG_DIR, "ldap_debug.log"), maxBytes=2_000_000, backupCount=5, encoding="utf-8", ) _handler.setFormatter(logging.Formatter( "%(asctime)s %(levelname)-7s [ldap] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", )) log.addHandler(_handler) log.setLevel(logging.DEBUG) log.propagate = False except Exception as _e: # pragma: no cover - ne jamais casser l'app pour un log logging.getLogger(__name__).warning("Impossible d'initialiser ldap_debug.log: %s", _e) try: from ldap3 import Server, Connection, ALL, NTLM, SIMPLE, Tls import ssl LDAP_OK = True # Active la journalisation protocolaire ldap3 vers le meme fichier. # NOTE: ldap3 masque les donnees sensibles (mots de passe) par defaut. try: from ldap3.utils.log import set_library_log_detail_level, EXTENDED set_library_log_detail_level(EXTENDED) _l3 = logging.getLogger("ldap3") _l3.setLevel(logging.DEBUG) if log.handlers and not _l3.handlers: _l3.addHandler(log.handlers[0]) except Exception: pass except ImportError: LDAP_OK = False log.warning("Module ldap3 non installe : authentification LDAP indisponible") def _get_config(db): """Retourne la config LDAP depuis app_secrets (via get_secret).""" from .secrets_service import get_secret def s(k, default=""): return get_secret(db, k) or default return { "enabled": s("ldap_enabled", "false").lower() == "true", "server": s("ldap_server"), "base_dn": s("ldap_base_dn"), "bind_dn": s("ldap_bind_dn"), "bind_pwd": s("ldap_bind_pwd"), "user_filter": s("ldap_user_filter", "(sAMAccountName={username})"), "tls": s("ldap_tls", "true").lower() == "true", "email_attr": s("ldap_email_attr", "mail"), "name_attr": s("ldap_name_attr", "displayName"), "required_group": s("ldap_required_group", ""), "default_role": s("ldap_default_role", "operator"), } def is_enabled(db): """LDAP activé et configuré ?""" cfg = _get_config(db) return cfg["enabled"] and cfg["server"] and cfg["base_dn"] def authenticate(db, username, password): """Tente l'authentification LDAP. Retourne dict {ok, email, name, dn} ou {ok: False, msg: '...'}""" if not LDAP_OK: return {"ok": False, "msg": "Module ldap3 non installé"} cfg = _get_config(db) use_ssl = cfg["server"].startswith("ldaps://") log.debug( "AUTH demande user=%s | server=%s ssl=%s tls=%s base_dn=%s bind_dn=%s filter=%s required_group=%s", username, cfg["server"], use_ssl, cfg["tls"], cfg["base_dn"], cfg["bind_dn"], cfg["user_filter"], cfg["required_group"] or "(aucun)", ) if not cfg["enabled"]: log.debug("AUTH user=%s -> LDAP desactive", username) return {"ok": False, "msg": "LDAP désactivé"} if not cfg["server"] or not cfg["base_dn"]: log.debug("AUTH user=%s -> LDAP non configure (server/base_dn manquant)", username) return {"ok": False, "msg": "LDAP non configuré"} # 1. Bind service account pour chercher le DN de l'utilisateur try: server = Server(cfg["server"], get_info=ALL, use_ssl=use_ssl) conn = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"], auto_bind=True) log.debug("AUTH user=%s -> bind compte de service OK (result=%s)", username, getattr(conn, "result", None)) except Exception as e: log.error("AUTH user=%s -> bind compte de service ECHEC: %s", username, e) return {"ok": False, "msg": f"Connexion LDAP échouée: {e}"} # 2. Recherche de l'utilisateur user_filter = cfg["user_filter"].replace("{username}", username) try: conn.search(cfg["base_dn"], user_filter, attributes=[cfg["email_attr"], cfg["name_attr"], "distinguishedName", "memberOf"]) log.debug("AUTH user=%s -> recherche filter=%s base=%s entries=%d", username, user_filter, cfg["base_dn"], len(conn.entries)) except Exception as e: log.error("AUTH user=%s -> recherche ECHEC: %s", username, e) conn.unbind() return {"ok": False, "msg": f"Recherche LDAP échouée: {e}"} if not conn.entries: log.debug("AUTH user=%s -> utilisateur introuvable", username) conn.unbind() return {"ok": False, "msg": "Utilisateur introuvable dans LDAP"} entry = conn.entries[0] user_dn = str(entry.distinguishedName) if hasattr(entry, "distinguishedName") else entry.entry_dn email = str(getattr(entry, cfg["email_attr"], "")) or "" name = str(getattr(entry, cfg["name_attr"], "")) or username groups = list(entry.memberOf.values) if hasattr(entry, "memberOf") and entry.memberOf else [] conn.unbind() log.debug("AUTH user=%s -> trouve dn=%s email=%s nb_groupes=%d", username, user_dn, email, len(groups)) # 3. Verification appartenance groupe (si configure) required = (cfg.get("required_group") or "").strip() if required: # Match insensible a la casse + espaces normalises norm_required = required.lower().replace(" ", "") is_member = any(norm_required == g.lower().replace(" ", "") for g in groups) log.debug("AUTH user=%s -> controle groupe requis=%s membre=%s", username, required, is_member) if not is_member: return {"ok": False, "msg": f"Acces refuse: utilisateur non membre du groupe requis"} # 4. Bind avec les credentials fournis try: user_conn = Connection(server, user=user_dn, password=password, auto_bind=True) log.debug("AUTH user=%s -> bind utilisateur OK (result=%s)", username, getattr(user_conn, "result", None)) user_conn.unbind() except Exception as e: log.warning("AUTH user=%s -> bind utilisateur ECHEC (mot de passe incorrect ?): %s", username, e) return {"ok": False, "msg": "Mot de passe incorrect"} log.info("AUTH SUCCES user=%s dn=%s", username, user_dn) return {"ok": True, "dn": user_dn, "email": email, "name": name, "groups": groups, "default_role": cfg.get("default_role", "operator")} def test_connection(db): """Test la connexion LDAP avec le compte de bind (pour admin).""" if not LDAP_OK: return {"ok": False, "msg": "Module ldap3 non installé"} cfg = _get_config(db) if not cfg["server"]: return {"ok": False, "msg": "Serveur non configuré"} use_ssl = cfg["server"].startswith("ldaps://") log.debug("TEST connexion -> server=%s ssl=%s bind_dn=%s", cfg["server"], use_ssl, cfg["bind_dn"]) try: server = Server(cfg["server"], get_info=ALL, use_ssl=use_ssl) conn = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"], auto_bind=True) log.info("TEST connexion -> OK (result=%s)", getattr(conn, "result", None)) conn.unbind() return {"ok": True, "msg": "Connexion réussie"} except Exception as e: log.error("TEST connexion -> ECHEC: %s", e) return {"ok": False, "msg": str(e)[:200]}