Add link_patch_history_intervenants : lie patch_history.intervenant_name -> users.id (FK)

This commit is contained in:
Pierre & Lumière 2026-04-17 12:31:40 +00:00
parent 7ec7c49c34
commit 81227833c1

View File

@ -0,0 +1,141 @@
"""Verifie + etablit les 3 liens : patch_history <-> users <-> contacts.
Contexte :
- patch_history.intervenant_name : texte libre venant du xlsx (ex "Khalid", "Mouaad")
- users.id : FK cible pour patch_history.intervenant_id
- users.contact_id : FK vers contacts.id
- contacts.ldap_dn : trace source AD
Matching : on tente d'apparier patch_history.intervenant_name a users.display_name
(ex "Khalid" -> "MOUTAOUAKIL-ext Khalid (admin)") en cherchant le prenom comme token.
Usage :
python tools/link_patch_history_intervenants.py # verif seule
python tools/link_patch_history_intervenants.py --apply # UPDATE FK
"""
import os
import sys
import argparse
from sqlalchemy import create_engine, text
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
or os.getenv("DATABASE_URL")
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
def report_state(conn):
print("\n=== ETAT ACTUEL DES 3 TABLES ===")
r = conn.execute(text("""
SELECT
(SELECT COUNT(*) FROM users) AS users_total,
(SELECT COUNT(*) FROM users WHERE auth_type='ldap') AS users_ldap,
(SELECT COUNT(*) FROM users WHERE contact_id IS NOT NULL) AS users_with_contact,
(SELECT COUNT(*) FROM contacts) AS contacts_total,
(SELECT COUNT(*) FROM contacts WHERE ldap_dn IS NOT NULL) AS contacts_with_ldap,
(SELECT COUNT(*) FROM patch_history) AS ph_total,
(SELECT COUNT(*) FROM patch_history WHERE intervenant_name IS NOT NULL) AS ph_with_name,
(SELECT COUNT(*) FROM patch_history WHERE intervenant_id IS NOT NULL) AS ph_with_user_fk
""")).fetchone()
print(f" users : total={r.users_total} | ldap={r.users_ldap} | lie contact={r.users_with_contact}")
print(f" contacts: total={r.contacts_total} | avec ldap_dn={r.contacts_with_ldap}")
print(f" patch_history : total={r.ph_total} | avec intervenant_name={r.ph_with_name} "
f"| avec intervenant_id (FK users)={r.ph_with_user_fk}")
print("\n=== DISTRIBUTION patch_history.intervenant_name ===")
for row in conn.execute(text("""
SELECT intervenant_name, COUNT(*) AS n
FROM patch_history WHERE intervenant_name IS NOT NULL
GROUP BY 1 ORDER BY 2 DESC
""")).fetchall():
print(f" {row.n:5d} {row.intervenant_name}")
print("\n=== USERS LDAP (candidats FK) ===")
for row in conn.execute(text("""
SELECT u.username, u.display_name, u.email, c.name AS contact_name,
CASE WHEN c.ldap_dn IS NOT NULL THEN 'LDAP' ELSE '-' END AS src
FROM users u LEFT JOIN contacts c ON u.contact_id=c.id
WHERE u.auth_type='ldap' ORDER BY u.username
""")).fetchall():
print(f" {row.username:15s} | {row.display_name or '-':45s} | {row.email:30s} | {row.src}")
def propose_mapping(conn):
"""Retourne dict {intervenant_name -> user_id} en matchant par prenom."""
users = conn.execute(text("""
SELECT id, username, display_name FROM users WHERE auth_type='ldap'
""")).fetchall()
names = conn.execute(text("""
SELECT DISTINCT intervenant_name FROM patch_history
WHERE intervenant_name IS NOT NULL
""")).fetchall()
mapping = {}
for name_row in names:
n = name_row.intervenant_name
if not n:
continue
n_lo = n.strip().lower()
# Matchs d'exclusion - collectives
if n_lo in ("secops", "secops-team", "secops team"):
continue
candidates = []
for u in users:
dn = (u.display_name or "").lower()
# Token match : "khalid" dans "moutaouakil-ext khalid (admin)"
if f" {n_lo} " in f" {dn} " or dn.endswith(f" {n_lo}") or dn.startswith(f"{n_lo} "):
candidates.append(u)
# Cas Joel : display_name peut contenir "Joël" avec accent
elif n_lo == "joel" and ("joël" in dn or "joel" in dn):
candidates.append(u)
if len(candidates) == 1:
mapping[n] = candidates[0].id
elif len(candidates) > 1:
print(f" [AMBIG] '{n}' matche {len(candidates)} users : {[c.username for c in candidates]}")
else:
print(f" [MISS] '{n}' -> aucun user LDAP trouve (peut-etre pas dans groupe secops)")
return mapping
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--apply", action="store_true",
help="Applique vraiment le UPDATE FK (par defaut : dry-run verif)")
args = parser.parse_args()
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
with engine.begin() as conn:
report_state(conn)
print("\n=== MATCHING intervenant_name -> users.id ===")
mapping = propose_mapping(conn)
print(f"\n {len(mapping)} correspondance(s) unique(s) trouvees :")
for name, uid in mapping.items():
u = conn.execute(text("SELECT display_name, username FROM users WHERE id=:i"),
{"i": uid}).fetchone()
print(f" '{name}' -> #{uid} {u.username} ({u.display_name})")
if not args.apply:
print("\n[DRY-RUN] Rien ecrit. Relance avec --apply pour UPDATE patch_history.intervenant_id")
return
print("\n=== APPLY : UPDATE patch_history.intervenant_id ===")
total_updated = 0
for name, uid in mapping.items():
r = conn.execute(text("""
UPDATE patch_history SET intervenant_id = :uid
WHERE intervenant_name = :name AND intervenant_id IS NULL
"""), {"uid": uid, "name": name})
print(f" '{name}' -> user #{uid} : {r.rowcount} lignes")
total_updated += r.rowcount
print(f"\n[OK] Total UPDATE : {total_updated} lignes")
# Re-verif apres
print("\n=== ETAT APRES APPLY ===")
report_state(conn)
if __name__ == "__main__":
main()