patchcenter/tools/cleanup_referentiel.py
Admin MPCZ 55d1c2b43d Add cleanup_referentiel + --overwrite sur fill_emails
cleanup_referentiel: aligne envs sur canoniques iTop (Test1/Test2->Test,
Developpement->Développement, Pre-production->Pré-Prod) + fusion domains
avec stem (Peages->Péage).

fill_emails --overwrite: force la reecriture des emails existants.
2026-04-14 20:51:16 +02:00

215 lines
8.4 KiB
Python

"""Aligne domains/environments sur les valeurs canoniques iTop SANEF.
Pour chaque table:
1. Identifie les valeurs canoniques iTop (preserves)
2. Pour les valeurs proches (case/accent/suffixe num) -> fusionne vers le canonique
3. Migre les FK + supprime les doublons
4. Renomme aussi servers.environnement plain-text si applicable
Canoniques :
ENVS = Production, Recette, Test, Développement, Intégration, Pré-Prod, Formation
DOMAINS = pas de canonique fixe, fusion case/accent + stem (Peages -> Péage)
Usage:
python tools/cleanup_referentiel.py [--dry-run]
"""
import os
import re
import argparse
import unicodedata
from sqlalchemy import create_engine, text
DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo"
# Valeurs canoniques iTop pour environnements
ITOP_ENVS_CANONICAL = [
"Production", "Recette", "Test", "Développement",
"Intégration", "Pré-Prod", "Formation",
]
# Aliases connus (apres normalisation lowercase/sans accent)
ENV_ALIASES = {
"production": "Production", "prod": "Production",
"recette": "Recette", "rec": "Recette",
"test": "Test", "tests": "Test",
"test1": "Test", "test2": "Test", "test3": "Test",
"developpement": "Développement", "dev": "Développement",
"integration": "Intégration", "int": "Intégration",
"preprod": "Pré-Prod", "pre-prod": "Pré-Prod",
"preproduction": "Pré-Prod", "pre-production": "Pré-Prod",
"formation": "Formation",
}
def norm_key(s):
if not s:
return ""
nfkd = unicodedata.normalize("NFKD", s.strip())
ascii_str = "".join(c for c in nfkd if not unicodedata.combining(c))
return ascii_str.lower()
def env_canonical(name):
"""Retourne la version canonique iTop ou None si inconnu."""
if not name:
return None
k = norm_key(name).replace(" ", "")
return ENV_ALIASES.get(k)
def domain_stem(s):
"""Stem domaine: lowercase ascii sans 's' final (Peages -> peage)."""
k = norm_key(s)
return re.sub(r"s$", "", k)
def cleanliness_score(name):
"""Plus le score est haut, plus le nom est 'propre' (keeper)."""
has_upper = any(c.isupper() for c in name)
has_accent = any(unicodedata.combining(c) for c in unicodedata.normalize("NFKD", name))
return (has_accent * 2 + has_upper, -len(name))
def merge_envs(conn, dry_run):
"""Fusionne envs vers canoniques iTop."""
rows = conn.execute(text("SELECT id, name FROM environments ORDER BY id")).fetchall()
print(f"\n=== ENVIRONMENTS ({len(rows)}) ===")
# Group: canonical_target -> [(id, name), ...]
groups = {}
leftovers = []
for r in rows:
canon = env_canonical(r.name)
if canon:
groups.setdefault(canon, []).append((r.id, r.name))
else:
leftovers.append((r.id, r.name))
merged = 0
for canon, items in groups.items():
if len(items) <= 1 and items[0][1] == canon:
continue
# Trouve le keeper (celui dont le name == canonical)
keeper = next((it for it in items if it[1] == canon), None)
if not keeper:
# Aucun n'a le nom exact: on renomme le 1er en canonique
keeper = items[0]
print(f" RENAME env id={keeper[0]} '{keeper[1]}' -> '{canon}'")
if not dry_run:
conn.execute(text("UPDATE environments SET name=:n WHERE id=:id"),
{"n": canon, "id": keeper[0]})
for dup_id, dup_name in items:
if dup_id == keeper[0]:
continue
cnt_de = conn.execute(text(
"SELECT COUNT(*) FROM domain_environments WHERE environment_id=:e"
), {"e": dup_id}).scalar()
cnt_srv = conn.execute(text(
"SELECT COUNT(*) FROM servers WHERE environnement=:n"
), {"n": dup_name}).scalar()
print(f" MERGE env '{dup_name}' (id={dup_id}, {cnt_de} assocs, "
f"{cnt_srv} servers) -> '{canon}'")
if not dry_run:
# Migre domain_environments (gere les conflits keeper/env)
assocs = conn.execute(text(
"SELECT id, domain_id FROM domain_environments WHERE environment_id=:e"
), {"e": dup_id}).fetchall()
for a in assocs:
existing = conn.execute(text(
"SELECT id FROM domain_environments "
"WHERE domain_id=:d AND environment_id=:e"
), {"d": a.domain_id, "e": keeper[0]}).fetchone()
if existing:
conn.execute(text(
"UPDATE servers SET domain_env_id=:new WHERE domain_env_id=:old"
), {"new": existing.id, "old": a.id})
conn.execute(text("DELETE FROM domain_environments WHERE id=:id"),
{"id": a.id})
else:
conn.execute(text(
"UPDATE domain_environments SET environment_id=:k WHERE id=:id"
), {"k": keeper[0], "id": a.id})
# Renomme servers.environnement plain-text
conn.execute(text(
"UPDATE servers SET environnement=:n WHERE environnement=:o"
), {"n": canon, "o": dup_name})
conn.execute(text("DELETE FROM environments WHERE id=:id"), {"id": dup_id})
merged += 1
if leftovers:
print(f" [LEFTOVERS non-iTop ({len(leftovers)})] : "
f"{[l[1] for l in leftovers]}")
print(" -> A traiter manuellement via /referentiel ou supprimer si vide")
print(f" Envs fusionnes: {merged}")
return merged
def merge_domains(conn, dry_run):
"""Fusionne domaines case/accent + stem (Peages -> Péage)."""
rows = conn.execute(text("SELECT id, name, code FROM domains ORDER BY id")).fetchall()
print(f"\n=== DOMAINS ({len(rows)}) ===")
groups = {}
for r in rows:
k = domain_stem(r.name)
groups.setdefault(k, []).append((r.id, r.name, r.code))
merged = 0
for k, items in groups.items():
if len(items) <= 1:
continue
items.sort(key=lambda x: cleanliness_score(x[1]), reverse=True)
keeper_id, keeper_name, keeper_code = items[0]
print(f"\n [GROUP stem='{k}'] keeper='{keeper_name}' (id={keeper_id})")
for dup_id, dup_name, dup_code in items[1:]:
cnt_de = conn.execute(text(
"SELECT COUNT(*) FROM domain_environments WHERE domain_id=:d"
), {"d": dup_id}).scalar()
print(f" MERGE '{dup_name}' (id={dup_id}, {cnt_de} assocs) -> '{keeper_name}'")
if not dry_run:
assocs = conn.execute(text(
"SELECT id, environment_id FROM domain_environments WHERE domain_id=:d"
), {"d": dup_id}).fetchall()
for a in assocs:
existing = conn.execute(text(
"SELECT id FROM domain_environments "
"WHERE domain_id=:k AND environment_id=:e"
), {"k": keeper_id, "e": a.environment_id}).fetchone()
if existing:
conn.execute(text(
"UPDATE servers SET domain_env_id=:new WHERE domain_env_id=:old"
), {"new": existing.id, "old": a.id})
conn.execute(text("DELETE FROM domain_environments WHERE id=:id"),
{"id": a.id})
else:
conn.execute(text(
"UPDATE domain_environments SET domain_id=:k WHERE id=:id"
), {"k": keeper_id, "id": a.id})
conn.execute(text("DELETE FROM domains WHERE id=:id"), {"id": dup_id})
merged += 1
print(f"\n Domaines fusionnes: {merged}")
return merged
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}")
conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT")
n_env = merge_envs(conn, args.dry_run)
n_dom = merge_domains(conn, args.dry_run)
conn.close()
print(f"\n[DONE] {'(DRY) ' if args.dry_run else ''}envs: {n_env} | domains: {n_dom}")
if __name__ == "__main__":
main()