Add fill_emails_from_contacts: complete domain_environments emails depuis contacts

This commit is contained in:
Pierre & Lumière 2026-04-14 20:41:29 +02:00
parent 9f4d7707ef
commit 6c8e2a3339

View File

@ -0,0 +1,91 @@
"""Complete domain_environments.responsable_email / referent_email depuis contacts.
Match sur le nom (contacts.name responsable_nom ou referent_nom).
Usage:
python tools/fill_emails_from_contacts.py [--dry-run]
"""
import os
import argparse
import unicodedata
from sqlalchemy import create_engine, text
DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo"
def normalize(s):
"""Normalise un nom: ASCII lowercase sans accent ni espaces multiples."""
if not s:
return ""
nfkd = unicodedata.normalize("NFKD", s.strip().lower())
ascii_str = "".join(c for c in nfkd if not unicodedata.combining(c))
return " ".join(ascii_str.split())
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}")
conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT")
# Cache contacts: normalized name -> email
contacts = conn.execute(text(
"SELECT name, email FROM contacts WHERE email IS NOT NULL AND email != ''"
)).fetchall()
name_to_email = {}
for c in contacts:
key = normalize(c.name)
if key and "no-email" not in (c.email or "").lower():
name_to_email[key] = c.email
print(f"[INFO] {len(name_to_email)} contacts avec email")
# Domain_environments a completer
rows = conn.execute(text("""
SELECT id, responsable_nom, responsable_email, referent_nom, referent_email
FROM domain_environments
WHERE (responsable_email IS NULL OR responsable_email = '' OR referent_email IS NULL OR referent_email = '')
""")).fetchall()
print(f"[INFO] {len(rows)} (dom,env) a completer")
updated = 0
unmatched = set()
for r in rows:
updates = {}
if r.responsable_nom and not (r.responsable_email or "").strip():
key = normalize(r.responsable_nom)
email = name_to_email.get(key)
if email:
updates["responsable_email"] = email
elif key:
unmatched.add(r.responsable_nom)
if r.referent_nom and not (r.referent_email or "").strip():
key = normalize(r.referent_nom)
email = name_to_email.get(key)
if email:
updates["referent_email"] = email
elif key:
unmatched.add(r.referent_nom)
if not updates:
continue
if args.dry_run:
print(f" DRY: de_id={r.id} {updates}")
else:
sets = ", ".join(f"{k}=:{k}" for k in updates)
params = dict(updates); params["id"] = r.id
conn.execute(text(f"UPDATE domain_environments SET {sets} WHERE id=:id"), params)
updated += 1
conn.close()
print(f"\n[DONE] Maj: {updated}")
if unmatched:
print(f"[WARN] Noms sans match dans contacts ({len(unmatched)}) : "
f"{sorted(unmatched)[:10]}...")
if __name__ == "__main__":
main()