patchcenter/app/services/ldap_service.py
Admin MPCZ 53d4f71607 LDAP: restriction groupe AD + auto-provisioning users (sans permissions)
- Settings ldap_required_group (DN groupe autorise) + ldap_default_role
- ldap_authenticate verifie memberOf vs required_group avant bind
- auth.py: si user inconnu + LDAP + groupe OK -> auto-create user, role default,
  zero permission (admin doit assigner via /users)
2026-04-15 11:45:33 +02:00

127 lines
4.9 KiB
Python

"""Service LDAP/AD — authentification via annuaire.
Configuration via settings (clés) :
- ldap_enabled : "true" / "false"
- ldap_server : URL du serveur (ex: ldaps://ad.sanef.com:636)
- ldap_base_dn : DN de base (ex: DC=sanef,DC=com)
- ldap_bind_dn : DN du compte de bind (ex: CN=svc_pc,OU=SVC,DC=sanef,DC=com)
- ldap_bind_pwd : mot de passe (stocké chiffré via secrets_service)
- ldap_user_filter : filtre utilisateur (ex: (sAMAccountName={username}))
- ldap_tls : "true" / "false"
"""
import logging
from sqlalchemy import text
log = logging.getLogger(__name__)
try:
from ldap3 import Server, Connection, ALL, NTLM, SIMPLE, Tls
import ssl
LDAP_OK = True
except ImportError:
LDAP_OK = False
def _get_config(db):
"""Retourne la config LDAP depuis app_secrets (via get_secret)."""
from .secrets_service import get_secret
def s(k, default=""):
return get_secret(db, k) or default
return {
"enabled": s("ldap_enabled", "false").lower() == "true",
"server": s("ldap_server"),
"base_dn": s("ldap_base_dn"),
"bind_dn": s("ldap_bind_dn"),
"bind_pwd": s("ldap_bind_pwd"),
"user_filter": s("ldap_user_filter", "(sAMAccountName={username})"),
"tls": s("ldap_tls", "true").lower() == "true",
"email_attr": s("ldap_email_attr", "mail"),
"name_attr": s("ldap_name_attr", "displayName"),
"required_group": s("ldap_required_group", ""),
"default_role": s("ldap_default_role", "operator"),
}
def is_enabled(db):
"""LDAP activé et configuré ?"""
cfg = _get_config(db)
return cfg["enabled"] and cfg["server"] and cfg["base_dn"]
def authenticate(db, username, password):
"""Tente l'authentification LDAP.
Retourne dict {ok, email, name, dn} ou {ok: False, msg: '...'}"""
if not LDAP_OK:
return {"ok": False, "msg": "Module ldap3 non installé"}
cfg = _get_config(db)
if not cfg["enabled"]:
return {"ok": False, "msg": "LDAP désactivé"}
if not cfg["server"] or not cfg["base_dn"]:
return {"ok": False, "msg": "LDAP non configuré"}
# 1. Bind service account pour chercher le DN de l'utilisateur
try:
server = Server(cfg["server"], get_info=ALL, use_ssl=cfg["server"].startswith("ldaps://"))
conn = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"], auto_bind=True)
except Exception as e:
log.error(f"LDAP bind failed: {e}")
return {"ok": False, "msg": f"Connexion LDAP échouée: {e}"}
# 2. Recherche de l'utilisateur
user_filter = cfg["user_filter"].replace("{username}", username)
try:
conn.search(cfg["base_dn"], user_filter,
attributes=[cfg["email_attr"], cfg["name_attr"], "distinguishedName", "memberOf"])
except Exception as e:
conn.unbind()
return {"ok": False, "msg": f"Recherche LDAP échouée: {e}"}
if not conn.entries:
conn.unbind()
return {"ok": False, "msg": "Utilisateur introuvable dans LDAP"}
entry = conn.entries[0]
user_dn = str(entry.distinguishedName) if hasattr(entry, "distinguishedName") else entry.entry_dn
email = str(getattr(entry, cfg["email_attr"], "")) or ""
name = str(getattr(entry, cfg["name_attr"], "")) or username
groups = list(entry.memberOf.values) if hasattr(entry, "memberOf") and entry.memberOf else []
conn.unbind()
# 3. Verification appartenance groupe (si configure)
required = (cfg.get("required_group") or "").strip()
if required:
# Match insensible a la casse + espaces normalises
norm_required = required.lower().replace(" ", "")
is_member = any(norm_required == g.lower().replace(" ", "") for g in groups)
if not is_member:
return {"ok": False, "msg": f"Acces refuse: utilisateur non membre du groupe requis"}
# 4. Bind avec les credentials fournis
try:
user_conn = Connection(server, user=user_dn, password=password, auto_bind=True)
user_conn.unbind()
except Exception as e:
return {"ok": False, "msg": "Mot de passe incorrect"}
return {"ok": True, "dn": user_dn, "email": email, "name": name,
"groups": groups, "default_role": cfg.get("default_role", "operator")}
def test_connection(db):
"""Test la connexion LDAP avec le compte de bind (pour admin)."""
if not LDAP_OK:
return {"ok": False, "msg": "Module ldap3 non installé"}
cfg = _get_config(db)
if not cfg["server"]:
return {"ok": False, "msg": "Serveur non configuré"}
try:
server = Server(cfg["server"], get_info=ALL, use_ssl=cfg["server"].startswith("ldaps://"))
conn = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"], auto_bind=True)
conn.unbind()
return {"ok": True, "msg": "Connexion réussie"}
except Exception as e:
return {"ok": False, "msg": str(e)[:200]}