"""Verifie + etablit les 3 liens : patch_history <-> users <-> contacts. Contexte : - patch_history.intervenant_name : texte libre venant du xlsx (ex "Khalid", "Mouaad") - users.id : FK cible pour patch_history.intervenant_id - users.contact_id : FK vers contacts.id - contacts.ldap_dn : trace source AD Matching : on tente d'apparier patch_history.intervenant_name a users.display_name (ex "Khalid" -> "MOUTAOUAKIL-ext Khalid (admin)") en cherchant le prenom comme token. Usage : python tools/link_patch_history_intervenants.py # verif seule python tools/link_patch_history_intervenants.py --apply # UPDATE FK """ import os import sys import argparse from sqlalchemy import create_engine, text DATABASE_URL = (os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db") def report_state(conn): print("\n=== ETAT ACTUEL DES 3 TABLES ===") r = conn.execute(text(""" SELECT (SELECT COUNT(*) FROM users) AS users_total, (SELECT COUNT(*) FROM users WHERE auth_type='ldap') AS users_ldap, (SELECT COUNT(*) FROM users WHERE contact_id IS NOT NULL) AS users_with_contact, (SELECT COUNT(*) FROM contacts) AS contacts_total, (SELECT COUNT(*) FROM contacts WHERE ldap_dn IS NOT NULL) AS contacts_with_ldap, (SELECT COUNT(*) FROM patch_history) AS ph_total, (SELECT COUNT(*) FROM patch_history WHERE intervenant_name IS NOT NULL) AS ph_with_name, (SELECT COUNT(*) FROM patch_history WHERE intervenant_id IS NOT NULL) AS ph_with_user_fk """)).fetchone() print(f" users : total={r.users_total} | ldap={r.users_ldap} | lie contact={r.users_with_contact}") print(f" contacts: total={r.contacts_total} | avec ldap_dn={r.contacts_with_ldap}") print(f" patch_history : total={r.ph_total} | avec intervenant_name={r.ph_with_name} " f"| avec intervenant_id (FK users)={r.ph_with_user_fk}") print("\n=== DISTRIBUTION patch_history.intervenant_name ===") for row in conn.execute(text(""" SELECT intervenant_name, COUNT(*) AS n FROM patch_history WHERE intervenant_name IS NOT NULL GROUP BY 1 ORDER BY 2 DESC """)).fetchall(): print(f" {row.n:5d} {row.intervenant_name}") print("\n=== USERS LDAP (candidats FK) ===") for row in conn.execute(text(""" SELECT u.username, u.display_name, u.email, c.name AS contact_name, CASE WHEN c.ldap_dn IS NOT NULL THEN 'LDAP' ELSE '-' END AS src FROM users u LEFT JOIN contacts c ON u.contact_id=c.id WHERE u.auth_type='ldap' ORDER BY u.username """)).fetchall(): print(f" {row.username:15s} | {row.display_name or '-':45s} | {row.email:30s} | {row.src}") def propose_mapping(conn): """Retourne dict {intervenant_name -> user_id} en matchant par prenom.""" users = conn.execute(text(""" SELECT id, username, display_name FROM users WHERE auth_type='ldap' """)).fetchall() names = conn.execute(text(""" SELECT DISTINCT intervenant_name FROM patch_history WHERE intervenant_name IS NOT NULL """)).fetchall() mapping = {} for name_row in names: n = name_row.intervenant_name if not n: continue n_lo = n.strip().lower() # Matchs d'exclusion - collectives if n_lo in ("secops", "secops-team", "secops team"): continue candidates = [] for u in users: dn = (u.display_name or "").lower() # Token match : "khalid" dans "moutaouakil-ext khalid (admin)" if f" {n_lo} " in f" {dn} " or dn.endswith(f" {n_lo}") or dn.startswith(f"{n_lo} "): candidates.append(u) # Cas Joel : display_name peut contenir "Joël" avec accent elif n_lo == "joel" and ("joël" in dn or "joel" in dn): candidates.append(u) if len(candidates) == 1: mapping[n] = candidates[0].id elif len(candidates) > 1: print(f" [AMBIG] '{n}' matche {len(candidates)} users : {[c.username for c in candidates]}") else: print(f" [MISS] '{n}' -> aucun user LDAP trouve (peut-etre pas dans groupe secops)") return mapping def main(): parser = argparse.ArgumentParser() parser.add_argument("--apply", action="store_true", help="Applique vraiment le UPDATE FK (par defaut : dry-run verif)") args = parser.parse_args() engine = create_engine(DATABASE_URL) print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}") with engine.begin() as conn: report_state(conn) print("\n=== MATCHING intervenant_name -> users.id ===") mapping = propose_mapping(conn) print(f"\n {len(mapping)} correspondance(s) unique(s) trouvees :") for name, uid in mapping.items(): u = conn.execute(text("SELECT display_name, username FROM users WHERE id=:i"), {"i": uid}).fetchone() print(f" '{name}' -> #{uid} {u.username} ({u.display_name})") if not args.apply: print("\n[DRY-RUN] Rien ecrit. Relance avec --apply pour UPDATE patch_history.intervenant_id") return print("\n=== APPLY : UPDATE patch_history.intervenant_id ===") total_updated = 0 for name, uid in mapping.items(): r = conn.execute(text(""" UPDATE patch_history SET intervenant_id = :uid WHERE intervenant_name = :name AND intervenant_id IS NULL """), {"uid": uid, "name": name}) print(f" '{name}' -> user #{uid} : {r.rowcount} lignes") total_updated += r.rowcount print(f"\n[OK] Total UPDATE : {total_updated} lignes") # Re-verif apres print("\n=== ETAT APRES APPLY ===") report_state(conn) if __name__ == "__main__": main()