"""Import des membres d'un groupe AD vers la table users + lien avec contacts. 3 champs lies : 1. LDAP/AD (source : groupe AD specifique, ex. CN=secops,...) 2. contacts (par email : match existant, creation si absent) 3. users (par username=sAMAccountName, auth_type='ldap') + users.itop_person_id = contacts.itop_id (si contact matche) Par defaut le groupe AD cible est : CN=secops,OU=Groupes d administration,OU=Administration,DC=sanef,DC=groupe La config LDAP (serveur, bind DN, bind pass, base DN) est lue depuis app_secrets via ldap_service. Usage (doit tourner depuis le poste SANEF car l'AD n'est pas joignable du lab) : python tools/import_ldap_group_users.py python tools/import_ldap_group_users.py --group "CN=secops,OU=...,DC=sanef,DC=groupe" --dry-run """ import os import sys import argparse from pathlib import Path from sqlalchemy import create_engine, text ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT)) DATABASE_URL = (os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db") DEFAULT_GROUP_DN = "CN=secops,OU=Groupes d administration,OU=Administration,DC=sanef,DC=groupe" def get_ldap_config(engine): """Recupere la config LDAP depuis app_secrets (reutilise ldap_service).""" from app.services.ldap_service import _get_config with engine.connect() as conn: return _get_config(conn) def fetch_group_members(cfg, group_dn): """Retourne liste de dicts {username, name, email, dn}. Strategie : bind service account -> search pour user dont memberOf contient group_dn. Plus fiable que de lire group.member (limite 1500 DN par defaut). """ from ldap3 import Server, Connection, ALL, SUBTREE use_ssl = cfg["server"].startswith("ldaps://") server = Server(cfg["server"], get_info=ALL, use_ssl=use_ssl) conn = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"], auto_bind=True) # Filter LDAP : membre direct du groupe (inclut comptes admin, meme sans mail) search_filter = ( f"(&(objectClass=user)(objectCategory=person)" f"(memberOf={group_dn}))" ) conn.search(cfg["base_dn"], search_filter, search_scope=SUBTREE, attributes=["sAMAccountName", "displayName", "mail", "userPrincipalName", "distinguishedName", "userAccountControl"]) members = [] for entry in conn.entries: sam = str(entry.sAMAccountName) if entry.sAMAccountName else None if not sam: print(f" [SKIP] Entry sans sAMAccountName : {entry.entry_dn}") continue # Priorite email : mail > userPrincipalName > fallback sam@sanef.com email = None if entry.mail and str(entry.mail).strip(): email = str(entry.mail).strip().lower() elif entry.userPrincipalName and str(entry.userPrincipalName).strip(): email = str(entry.userPrincipalName).strip().lower() else: email = f"{sam.lower()}@sanef.com" print(f" [INFO] {sam} sans mail AD, fallback : {email}") # Verifier si compte desactive (pour info seulement) uac = entry.userAccountControl.value if entry.userAccountControl else 0 if isinstance(uac, int) and uac & 0x2: print(f" [WARN] {sam} compte AD DESACTIVE (UAC={uac}) — importe quand meme") members.append({ "username": sam.lower(), "display_name": str(entry.displayName) if entry.displayName else sam, "email": email, "dn": str(entry.entry_dn), }) conn.unbind() return members SQL_FIND_CONTACT = text(""" SELECT id, itop_id FROM contacts WHERE lower(email) = :email LIMIT 1 """) SQL_INSERT_CONTACT = text(""" INSERT INTO contacts (name, email, role, team, ldap_dn, is_active, is_verified, created_at, updated_at) VALUES (:name, :email, 'contact_technique', 'SecOps', :ldap_dn, true, true, now(), now()) ON CONFLICT (email) DO UPDATE SET name = EXCLUDED.name, ldap_dn = EXCLUDED.ldap_dn, updated_at = now() RETURNING id, itop_id """) SQL_FIND_USER = text(""" SELECT id FROM users WHERE username = :username LIMIT 1 """) SQL_INSERT_USER = text(""" INSERT INTO users (username, display_name, email, role, auth_type, is_active, contact_id, itop_person_id, created_at, updated_at) VALUES (:username, :display_name, :email, 'operator', 'ldap', true, :contact_id, :itop_pid, now(), now()) """) SQL_UPDATE_USER = text(""" UPDATE users SET display_name = :display_name, email = :email, auth_type = 'ldap', -- is_active PRESERVE : jamais reactive un user desactive manuellement contact_id = COALESCE(:contact_id, contact_id), itop_person_id = COALESCE(:itop_pid, itop_person_id), updated_at = now() WHERE id = :uid """) def main(): parser = argparse.ArgumentParser() parser.add_argument("--group", default=DEFAULT_GROUP_DN, help=f"DN du groupe AD (defaut: {DEFAULT_GROUP_DN})") parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() print(f"[INFO] Groupe cible : {args.group}") engine = create_engine(DATABASE_URL) print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}") cfg = get_ldap_config(engine) if not cfg["enabled"]: print("[ERR] LDAP desactive dans app_secrets (ldap_enabled != true).") sys.exit(1) if not cfg["server"] or not cfg["base_dn"]: print("[ERR] LDAP non configure (server ou base_dn manquant).") sys.exit(1) print(f"[INFO] LDAP server : {cfg['server']} base_dn : {cfg['base_dn']}") try: members = fetch_group_members(cfg, args.group) except Exception as e: print(f"[ERR] LDAP search failed : {e}") sys.exit(2) print(f"[INFO] Membres AD retrouves : {len(members)}") for m in members[:5]: print(f" {m['username']:20s} {m['email']:40s} {m['display_name']}") if len(members) > 5: print(f" ... ({len(members) - 5} autres)") if args.dry_run: print("[DRY-RUN] Aucun write") return inserted_u = updated_u = created_c = linked_itop = linked_contact = 0 with engine.begin() as conn: for m in members: # 1. Contact : find or create (stocke ldap_dn pour tracer source AD) row = conn.execute(SQL_FIND_CONTACT, {"email": m["email"]}).fetchone() if row: contact_id, itop_id = row # Mettre a jour ldap_dn si contact deja existant sans la trace conn.execute(text( "UPDATE contacts SET ldap_dn = COALESCE(ldap_dn, :dn) WHERE id = :cid" ), {"dn": m["dn"], "cid": contact_id}) else: r = conn.execute(SQL_INSERT_CONTACT, { "name": m["display_name"], "email": m["email"], "ldap_dn": m["dn"], }).fetchone() contact_id, itop_id = r created_c += 1 # 2. User : upsert + lien contact_id (FK) + itop_person_id (historique) u = conn.execute(SQL_FIND_USER, {"username": m["username"]}).fetchone() params = { "username": m["username"], "display_name": m["display_name"], "email": m["email"], "contact_id": contact_id, "itop_pid": itop_id, # None si contact sans itop_id } if u: conn.execute(SQL_UPDATE_USER, {**params, "uid": u[0]}) updated_u += 1 else: conn.execute(SQL_INSERT_USER, params) inserted_u += 1 linked_contact += 1 if itop_id: linked_itop += 1 print(f"[OK] Termine :") print(f" users : INSERT {inserted_u} UPDATE {updated_u}") print(f" contacts : CREATE {created_c}") print(f" links : users.contact_id = {linked_contact}") print(f" users.itop_person_id = {linked_itop}") if __name__ == "__main__": main()