- Settings ldap_required_group (DN groupe autorise) + ldap_default_role - ldap_authenticate verifie memberOf vs required_group avant bind - auth.py: si user inconnu + LDAP + groupe OK -> auto-create user, role default, zero permission (admin doit assigner via /users)
127 lines
4.9 KiB
Python
127 lines
4.9 KiB
Python
"""Service LDAP/AD — authentification via annuaire.
|
|
|
|
Configuration via settings (clés) :
|
|
- ldap_enabled : "true" / "false"
|
|
- ldap_server : URL du serveur (ex: ldaps://ad.sanef.com:636)
|
|
- ldap_base_dn : DN de base (ex: DC=sanef,DC=com)
|
|
- ldap_bind_dn : DN du compte de bind (ex: CN=svc_pc,OU=SVC,DC=sanef,DC=com)
|
|
- ldap_bind_pwd : mot de passe (stocké chiffré via secrets_service)
|
|
- ldap_user_filter : filtre utilisateur (ex: (sAMAccountName={username}))
|
|
- ldap_tls : "true" / "false"
|
|
"""
|
|
import logging
|
|
from sqlalchemy import text
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
try:
|
|
from ldap3 import Server, Connection, ALL, NTLM, SIMPLE, Tls
|
|
import ssl
|
|
LDAP_OK = True
|
|
except ImportError:
|
|
LDAP_OK = False
|
|
|
|
|
|
def _get_config(db):
|
|
"""Retourne la config LDAP depuis app_secrets (via get_secret)."""
|
|
from .secrets_service import get_secret
|
|
|
|
def s(k, default=""):
|
|
return get_secret(db, k) or default
|
|
|
|
return {
|
|
"enabled": s("ldap_enabled", "false").lower() == "true",
|
|
"server": s("ldap_server"),
|
|
"base_dn": s("ldap_base_dn"),
|
|
"bind_dn": s("ldap_bind_dn"),
|
|
"bind_pwd": s("ldap_bind_pwd"),
|
|
"user_filter": s("ldap_user_filter", "(sAMAccountName={username})"),
|
|
"tls": s("ldap_tls", "true").lower() == "true",
|
|
"email_attr": s("ldap_email_attr", "mail"),
|
|
"name_attr": s("ldap_name_attr", "displayName"),
|
|
"required_group": s("ldap_required_group", ""),
|
|
"default_role": s("ldap_default_role", "operator"),
|
|
}
|
|
|
|
|
|
def is_enabled(db):
|
|
"""LDAP activé et configuré ?"""
|
|
cfg = _get_config(db)
|
|
return cfg["enabled"] and cfg["server"] and cfg["base_dn"]
|
|
|
|
|
|
def authenticate(db, username, password):
|
|
"""Tente l'authentification LDAP.
|
|
Retourne dict {ok, email, name, dn} ou {ok: False, msg: '...'}"""
|
|
if not LDAP_OK:
|
|
return {"ok": False, "msg": "Module ldap3 non installé"}
|
|
|
|
cfg = _get_config(db)
|
|
if not cfg["enabled"]:
|
|
return {"ok": False, "msg": "LDAP désactivé"}
|
|
if not cfg["server"] or not cfg["base_dn"]:
|
|
return {"ok": False, "msg": "LDAP non configuré"}
|
|
|
|
# 1. Bind service account pour chercher le DN de l'utilisateur
|
|
try:
|
|
server = Server(cfg["server"], get_info=ALL, use_ssl=cfg["server"].startswith("ldaps://"))
|
|
conn = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"], auto_bind=True)
|
|
except Exception as e:
|
|
log.error(f"LDAP bind failed: {e}")
|
|
return {"ok": False, "msg": f"Connexion LDAP échouée: {e}"}
|
|
|
|
# 2. Recherche de l'utilisateur
|
|
user_filter = cfg["user_filter"].replace("{username}", username)
|
|
try:
|
|
conn.search(cfg["base_dn"], user_filter,
|
|
attributes=[cfg["email_attr"], cfg["name_attr"], "distinguishedName", "memberOf"])
|
|
except Exception as e:
|
|
conn.unbind()
|
|
return {"ok": False, "msg": f"Recherche LDAP échouée: {e}"}
|
|
|
|
if not conn.entries:
|
|
conn.unbind()
|
|
return {"ok": False, "msg": "Utilisateur introuvable dans LDAP"}
|
|
|
|
entry = conn.entries[0]
|
|
user_dn = str(entry.distinguishedName) if hasattr(entry, "distinguishedName") else entry.entry_dn
|
|
email = str(getattr(entry, cfg["email_attr"], "")) or ""
|
|
name = str(getattr(entry, cfg["name_attr"], "")) or username
|
|
groups = list(entry.memberOf.values) if hasattr(entry, "memberOf") and entry.memberOf else []
|
|
conn.unbind()
|
|
|
|
# 3. Verification appartenance groupe (si configure)
|
|
required = (cfg.get("required_group") or "").strip()
|
|
if required:
|
|
# Match insensible a la casse + espaces normalises
|
|
norm_required = required.lower().replace(" ", "")
|
|
is_member = any(norm_required == g.lower().replace(" ", "") for g in groups)
|
|
if not is_member:
|
|
return {"ok": False, "msg": f"Acces refuse: utilisateur non membre du groupe requis"}
|
|
|
|
# 4. Bind avec les credentials fournis
|
|
try:
|
|
user_conn = Connection(server, user=user_dn, password=password, auto_bind=True)
|
|
user_conn.unbind()
|
|
except Exception as e:
|
|
return {"ok": False, "msg": "Mot de passe incorrect"}
|
|
|
|
return {"ok": True, "dn": user_dn, "email": email, "name": name,
|
|
"groups": groups, "default_role": cfg.get("default_role", "operator")}
|
|
|
|
|
|
def test_connection(db):
|
|
"""Test la connexion LDAP avec le compte de bind (pour admin)."""
|
|
if not LDAP_OK:
|
|
return {"ok": False, "msg": "Module ldap3 non installé"}
|
|
cfg = _get_config(db)
|
|
if not cfg["server"]:
|
|
return {"ok": False, "msg": "Serveur non configuré"}
|
|
try:
|
|
server = Server(cfg["server"], get_info=ALL, use_ssl=cfg["server"].startswith("ldaps://"))
|
|
conn = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"], auto_bind=True)
|
|
conn.unbind()
|
|
return {"ok": True, "msg": "Connexion réussie"}
|
|
except Exception as e:
|
|
return {"ok": False, "msg": str(e)[:200]}
|