"""Aligne domains/environments sur les valeurs canoniques iTop SANEF. Pour chaque table: 1. Identifie les valeurs canoniques iTop (preserves) 2. Pour les valeurs proches (case/accent/suffixe num) -> fusionne vers le canonique 3. Migre les FK + supprime les doublons 4. Renomme aussi servers.environnement plain-text si applicable Canoniques : ENVS = Production, Recette, Test, Développement, Intégration, Pré-Prod, Formation DOMAINS = pas de canonique fixe, fusion case/accent + stem (Peages -> Péage) Usage: python tools/cleanup_referentiel.py [--dry-run] """ import os import re import argparse import unicodedata from sqlalchemy import create_engine, text DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" # Valeurs canoniques iTop pour environnements ITOP_ENVS_CANONICAL = [ "Production", "Recette", "Test", "Développement", "Intégration", "Pré-Prod", "Formation", ] # Aliases connus (apres normalisation lowercase/sans accent) ENV_ALIASES = { "production": "Production", "prod": "Production", "recette": "Recette", "rec": "Recette", "test": "Test", "tests": "Test", "test1": "Test", "test2": "Test", "test3": "Test", "developpement": "Développement", "dev": "Développement", "integration": "Intégration", "int": "Intégration", "preprod": "Pré-Prod", "pre-prod": "Pré-Prod", "preproduction": "Pré-Prod", "pre-production": "Pré-Prod", "formation": "Formation", } def norm_key(s): if not s: return "" nfkd = unicodedata.normalize("NFKD", s.strip()) ascii_str = "".join(c for c in nfkd if not unicodedata.combining(c)) return ascii_str.lower() def env_canonical(name): """Retourne la version canonique iTop ou None si inconnu.""" if not name: return None k = norm_key(name).replace(" ", "") return ENV_ALIASES.get(k) def domain_stem(s): """Stem domaine: lowercase ascii sans 's' final (Peages -> peage).""" k = norm_key(s) return re.sub(r"s$", "", k) def cleanliness_score(name): """Plus le score est haut, plus le nom est 'propre' (keeper).""" has_upper = any(c.isupper() for c in name) has_accent = any(unicodedata.combining(c) for c in unicodedata.normalize("NFKD", name)) return (has_accent * 2 + has_upper, -len(name)) def merge_envs(conn, dry_run): """Fusionne envs vers canoniques iTop.""" rows = conn.execute(text("SELECT id, name FROM environments ORDER BY id")).fetchall() print(f"\n=== ENVIRONMENTS ({len(rows)}) ===") # Group: canonical_target -> [(id, name), ...] groups = {} leftovers = [] for r in rows: canon = env_canonical(r.name) if canon: groups.setdefault(canon, []).append((r.id, r.name)) else: leftovers.append((r.id, r.name)) merged = 0 for canon, items in groups.items(): if len(items) <= 1 and items[0][1] == canon: continue # Trouve le keeper (celui dont le name == canonical) keeper = next((it for it in items if it[1] == canon), None) if not keeper: # Aucun n'a le nom exact: on renomme le 1er en canonique keeper = items[0] print(f" RENAME env id={keeper[0]} '{keeper[1]}' -> '{canon}'") if not dry_run: conn.execute(text("UPDATE environments SET name=:n WHERE id=:id"), {"n": canon, "id": keeper[0]}) for dup_id, dup_name in items: if dup_id == keeper[0]: continue cnt_de = conn.execute(text( "SELECT COUNT(*) FROM domain_environments WHERE environment_id=:e" ), {"e": dup_id}).scalar() cnt_srv = conn.execute(text( "SELECT COUNT(*) FROM servers WHERE environnement=:n" ), {"n": dup_name}).scalar() print(f" MERGE env '{dup_name}' (id={dup_id}, {cnt_de} assocs, " f"{cnt_srv} servers) -> '{canon}'") if not dry_run: # Migre domain_environments (gere les conflits keeper/env) assocs = conn.execute(text( "SELECT id, domain_id FROM domain_environments WHERE environment_id=:e" ), {"e": dup_id}).fetchall() for a in assocs: existing = conn.execute(text( "SELECT id FROM domain_environments " "WHERE domain_id=:d AND environment_id=:e" ), {"d": a.domain_id, "e": keeper[0]}).fetchone() if existing: conn.execute(text( "UPDATE servers SET domain_env_id=:new WHERE domain_env_id=:old" ), {"new": existing.id, "old": a.id}) conn.execute(text("DELETE FROM domain_environments WHERE id=:id"), {"id": a.id}) else: conn.execute(text( "UPDATE domain_environments SET environment_id=:k WHERE id=:id" ), {"k": keeper[0], "id": a.id}) # Renomme servers.environnement plain-text conn.execute(text( "UPDATE servers SET environnement=:n WHERE environnement=:o" ), {"n": canon, "o": dup_name}) conn.execute(text("DELETE FROM environments WHERE id=:id"), {"id": dup_id}) merged += 1 if leftovers: print(f" [LEFTOVERS non-iTop ({len(leftovers)})] : " f"{[l[1] for l in leftovers]}") print(" -> A traiter manuellement via /referentiel ou supprimer si vide") print(f" Envs fusionnes: {merged}") return merged def merge_domains(conn, dry_run): """Fusionne domaines case/accent + stem (Peages -> Péage).""" rows = conn.execute(text("SELECT id, name, code FROM domains ORDER BY id")).fetchall() print(f"\n=== DOMAINS ({len(rows)}) ===") groups = {} for r in rows: k = domain_stem(r.name) groups.setdefault(k, []).append((r.id, r.name, r.code)) merged = 0 for k, items in groups.items(): if len(items) <= 1: continue items.sort(key=lambda x: cleanliness_score(x[1]), reverse=True) keeper_id, keeper_name, keeper_code = items[0] print(f"\n [GROUP stem='{k}'] keeper='{keeper_name}' (id={keeper_id})") for dup_id, dup_name, dup_code in items[1:]: cnt_de = conn.execute(text( "SELECT COUNT(*) FROM domain_environments WHERE domain_id=:d" ), {"d": dup_id}).scalar() print(f" MERGE '{dup_name}' (id={dup_id}, {cnt_de} assocs) -> '{keeper_name}'") if not dry_run: assocs = conn.execute(text( "SELECT id, environment_id FROM domain_environments WHERE domain_id=:d" ), {"d": dup_id}).fetchall() for a in assocs: existing = conn.execute(text( "SELECT id FROM domain_environments " "WHERE domain_id=:k AND environment_id=:e" ), {"k": keeper_id, "e": a.environment_id}).fetchone() if existing: conn.execute(text( "UPDATE servers SET domain_env_id=:new WHERE domain_env_id=:old" ), {"new": existing.id, "old": a.id}) conn.execute(text("DELETE FROM domain_environments WHERE id=:id"), {"id": a.id}) else: conn.execute(text( "UPDATE domain_environments SET domain_id=:k WHERE id=:id" ), {"k": keeper_id, "id": a.id}) conn.execute(text("DELETE FROM domains WHERE id=:id"), {"id": dup_id}) merged += 1 print(f"\n Domaines fusionnes: {merged}") return merged def main(): parser = argparse.ArgumentParser() parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() engine = create_engine(DATABASE_URL) print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}") conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") n_env = merge_envs(conn, args.dry_run) n_dom = merge_domains(conn, args.dry_run) conn.close() print(f"\n[DONE] {'(DRY) ' if args.dry_run else ''}envs: {n_env} | domains: {n_dom}") if __name__ == "__main__": main()