Add import_ldap_group_users + FK users.contact_id + contacts.ldap_dn

This commit is contained in:
Pierre & Lumière 2026-04-17 12:16:24 +00:00
parent 2ab2ceabba
commit 2a4c785535
3 changed files with 264 additions and 0 deletions

View File

@ -0,0 +1,44 @@
-- Migration 2026-04-17 : lier users ↔ contacts ↔ LDAP proprement (FK + index)
--
-- Avant : users.itop_person_id (int) pointe vers iTop (pas vers contacts.id)
-- -> lien indirect fragile entre users et contacts via itop_id
--
-- Apres : users.contact_id (FK propre vers contacts.id)
-- contacts.ldap_dn (trace la source AD quand le contact vient d'un import LDAP)
-- Les 3 tables sont jointes directement : users.contact_id = contacts.id
-- La source LDAP est identifiee par contacts.ldap_dn IS NOT NULL et/ou
-- users.auth_type = 'ldap'.
BEGIN;
-- 1. users.contact_id : FK vers contacts.id
ALTER TABLE users ADD COLUMN IF NOT EXISTS contact_id INTEGER;
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint WHERE conname = 'users_contact_id_fkey'
) THEN
ALTER TABLE users ADD CONSTRAINT users_contact_id_fkey
FOREIGN KEY (contact_id) REFERENCES contacts(id) ON DELETE SET NULL;
END IF;
END$$;
CREATE INDEX IF NOT EXISTS idx_users_contact_id ON users (contact_id);
-- 2. contacts.ldap_dn : trace provenance AD
ALTER TABLE contacts ADD COLUMN IF NOT EXISTS ldap_dn varchar(500);
CREATE INDEX IF NOT EXISTS idx_contacts_ldap_dn ON contacts (ldap_dn)
WHERE ldap_dn IS NOT NULL;
-- 3. Backfill users.contact_id depuis users.email <-> contacts.email
-- (pour les users deja presents dont l'email matche un contact)
UPDATE users u
SET contact_id = c.id
FROM contacts c
WHERE u.contact_id IS NULL
AND u.email IS NOT NULL
AND lower(u.email) = lower(c.email);
COMMENT ON COLUMN users.contact_id IS 'FK vers contacts.id — lien direct user ↔ contact (le meme email)';
COMMENT ON COLUMN contacts.ldap_dn IS 'DN AD d''ou provient ce contact (import LDAP). NULL si import iTop ou saisie manuelle';
COMMIT;

View File

@ -0,0 +1,206 @@
"""Import des membres d'un groupe AD vers la table users + lien avec contacts.
3 champs lies :
1. LDAP/AD (source : groupe AD specifique, ex. CN=secops,...)
2. contacts (par email : match existant, creation si absent)
3. users (par username=sAMAccountName, auth_type='ldap')
+ users.itop_person_id = contacts.itop_id (si contact matche)
Par defaut le groupe AD cible est :
CN=secops,OU=Groupes d administration,OU=Administration,DC=sanef,DC=groupe
La config LDAP (serveur, bind DN, bind pass, base DN) est lue depuis app_secrets
via ldap_service.
Usage (doit tourner depuis le poste SANEF car l'AD n'est pas joignable du lab) :
python tools/import_ldap_group_users.py
python tools/import_ldap_group_users.py --group "CN=secops,OU=...,DC=sanef,DC=groupe" --dry-run
"""
import os
import sys
import argparse
from pathlib import Path
from sqlalchemy import create_engine, text
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
or os.getenv("DATABASE_URL")
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
DEFAULT_GROUP_DN = "CN=secops,OU=Groupes d administration,OU=Administration,DC=sanef,DC=groupe"
def get_ldap_config(engine):
"""Recupere la config LDAP depuis app_secrets (reutilise ldap_service)."""
from app.services.ldap_service import _get_config
with engine.connect() as conn:
return _get_config(conn)
def fetch_group_members(cfg, group_dn):
"""Retourne liste de dicts {username, name, email, dn}.
Strategie : bind service account -> search pour user dont memberOf contient group_dn.
Plus fiable que de lire group.member (limite 1500 DN par defaut).
"""
from ldap3 import Server, Connection, ALL, SUBTREE
use_ssl = cfg["server"].startswith("ldaps://")
server = Server(cfg["server"], get_info=ALL, use_ssl=use_ssl)
conn = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"],
auto_bind=True)
# Filter LDAP : user actif, membre direct du groupe
search_filter = (
f"(&(objectClass=user)(objectCategory=person)"
f"(!(userAccountControl:1.2.840.113556.1.4.803:=2))"
f"(memberOf={group_dn}))"
)
conn.search(cfg["base_dn"], search_filter, search_scope=SUBTREE,
attributes=["sAMAccountName", "displayName", "mail",
"distinguishedName", "userAccountControl"])
members = []
for entry in conn.entries:
email = str(entry.mail) if entry.mail else None
if not email:
continue
members.append({
"username": str(entry.sAMAccountName).lower(),
"display_name": str(entry.displayName) if entry.displayName else str(entry.sAMAccountName),
"email": email.lower(),
"dn": str(entry.distinguishedName),
})
conn.unbind()
return members
SQL_FIND_CONTACT = text("""
SELECT id, itop_id FROM contacts WHERE lower(email) = :email LIMIT 1
""")
SQL_INSERT_CONTACT = text("""
INSERT INTO contacts (name, email, role, team, ldap_dn,
is_active, is_verified, created_at, updated_at)
VALUES (:name, :email, 'contact_technique', 'SecOps', :ldap_dn,
true, true, now(), now())
ON CONFLICT (email) DO UPDATE SET
name = EXCLUDED.name,
ldap_dn = EXCLUDED.ldap_dn,
updated_at = now()
RETURNING id, itop_id
""")
SQL_FIND_USER = text("""
SELECT id FROM users WHERE username = :username LIMIT 1
""")
SQL_INSERT_USER = text("""
INSERT INTO users (username, display_name, email, role, auth_type,
is_active, contact_id, itop_person_id,
created_at, updated_at)
VALUES (:username, :display_name, :email, 'operator', 'ldap',
true, :contact_id, :itop_pid,
now(), now())
""")
SQL_UPDATE_USER = text("""
UPDATE users SET
display_name = :display_name,
email = :email,
auth_type = 'ldap',
is_active = true,
contact_id = COALESCE(:contact_id, contact_id),
itop_person_id = COALESCE(:itop_pid, itop_person_id),
updated_at = now()
WHERE id = :uid
""")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--group", default=DEFAULT_GROUP_DN,
help=f"DN du groupe AD (defaut: {DEFAULT_GROUP_DN})")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
print(f"[INFO] Groupe cible : {args.group}")
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
cfg = get_ldap_config(engine)
if not cfg["enabled"]:
print("[ERR] LDAP desactive dans app_secrets (ldap_enabled != true).")
sys.exit(1)
if not cfg["server"] or not cfg["base_dn"]:
print("[ERR] LDAP non configure (server ou base_dn manquant).")
sys.exit(1)
print(f"[INFO] LDAP server : {cfg['server']} base_dn : {cfg['base_dn']}")
try:
members = fetch_group_members(cfg, args.group)
except Exception as e:
print(f"[ERR] LDAP search failed : {e}")
sys.exit(2)
print(f"[INFO] Membres AD retrouves : {len(members)}")
for m in members[:5]:
print(f" {m['username']:20s} {m['email']:40s} {m['display_name']}")
if len(members) > 5:
print(f" ... ({len(members) - 5} autres)")
if args.dry_run:
print("[DRY-RUN] Aucun write")
return
inserted_u = updated_u = created_c = linked_itop = linked_contact = 0
with engine.begin() as conn:
for m in members:
# 1. Contact : find or create (stocke ldap_dn pour tracer source AD)
row = conn.execute(SQL_FIND_CONTACT, {"email": m["email"]}).fetchone()
if row:
contact_id, itop_id = row
# Mettre a jour ldap_dn si contact deja existant sans la trace
conn.execute(text(
"UPDATE contacts SET ldap_dn = COALESCE(ldap_dn, :dn) WHERE id = :cid"
), {"dn": m["dn"], "cid": contact_id})
else:
r = conn.execute(SQL_INSERT_CONTACT, {
"name": m["display_name"],
"email": m["email"],
"ldap_dn": m["dn"],
}).fetchone()
contact_id, itop_id = r
created_c += 1
# 2. User : upsert + lien contact_id (FK) + itop_person_id (historique)
u = conn.execute(SQL_FIND_USER, {"username": m["username"]}).fetchone()
params = {
"username": m["username"],
"display_name": m["display_name"],
"email": m["email"],
"contact_id": contact_id,
"itop_pid": itop_id, # None si contact sans itop_id
}
if u:
conn.execute(SQL_UPDATE_USER, {**params, "uid": u[0]})
updated_u += 1
else:
conn.execute(SQL_INSERT_USER, params)
inserted_u += 1
linked_contact += 1
if itop_id:
linked_itop += 1
print(f"[OK] Termine :")
print(f" users : INSERT {inserted_u} UPDATE {updated_u}")
print(f" contacts : CREATE {created_c}")
print(f" links : users.contact_id = {linked_contact}")
print(f" users.itop_person_id = {linked_itop}")
if __name__ == "__main__":
main()

View File

@ -30,6 +30,20 @@ DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db") or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
# Normalisation des noms d'intervenants (xlsx libre -> canonique)
INTERVENANT_MAP = {
"sophie/joel": "Joel",
"joel/sophie": "Joel",
}
def normalize_intervenant(name):
if not name:
return None
s = str(name).strip()
return INTERVENANT_MAP.get(s.lower(), s)
def is_green(cell): def is_green(cell):
"""True si la cellule a un fond vert (dominante G > R et G > B).""" """True si la cellule a un fond vert (dominante G > R et G > B)."""
if cell.fill is None or cell.fill.fgColor is None: if cell.fill is None or cell.fill.fgColor is None: