cleanup_referentiel: aligne envs sur canoniques iTop (Test1/Test2->Test, Developpement->Développement, Pre-production->Pré-Prod) + fusion domains avec stem (Peages->Péage). fill_emails --overwrite: force la reecriture des emails existants.
215 lines
8.4 KiB
Python
215 lines
8.4 KiB
Python
"""Aligne domains/environments sur les valeurs canoniques iTop SANEF.
|
|
|
|
Pour chaque table:
|
|
1. Identifie les valeurs canoniques iTop (preserves)
|
|
2. Pour les valeurs proches (case/accent/suffixe num) -> fusionne vers le canonique
|
|
3. Migre les FK + supprime les doublons
|
|
4. Renomme aussi servers.environnement plain-text si applicable
|
|
|
|
Canoniques :
|
|
ENVS = Production, Recette, Test, Développement, Intégration, Pré-Prod, Formation
|
|
DOMAINS = pas de canonique fixe, fusion case/accent + stem (Peages -> Péage)
|
|
|
|
Usage:
|
|
python tools/cleanup_referentiel.py [--dry-run]
|
|
"""
|
|
import os
|
|
import re
|
|
import argparse
|
|
import unicodedata
|
|
from sqlalchemy import create_engine, text
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \
|
|
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo"
|
|
|
|
# Valeurs canoniques iTop pour environnements
|
|
ITOP_ENVS_CANONICAL = [
|
|
"Production", "Recette", "Test", "Développement",
|
|
"Intégration", "Pré-Prod", "Formation",
|
|
]
|
|
|
|
# Aliases connus (apres normalisation lowercase/sans accent)
|
|
ENV_ALIASES = {
|
|
"production": "Production", "prod": "Production",
|
|
"recette": "Recette", "rec": "Recette",
|
|
"test": "Test", "tests": "Test",
|
|
"test1": "Test", "test2": "Test", "test3": "Test",
|
|
"developpement": "Développement", "dev": "Développement",
|
|
"integration": "Intégration", "int": "Intégration",
|
|
"preprod": "Pré-Prod", "pre-prod": "Pré-Prod",
|
|
"preproduction": "Pré-Prod", "pre-production": "Pré-Prod",
|
|
"formation": "Formation",
|
|
}
|
|
|
|
|
|
def norm_key(s):
|
|
if not s:
|
|
return ""
|
|
nfkd = unicodedata.normalize("NFKD", s.strip())
|
|
ascii_str = "".join(c for c in nfkd if not unicodedata.combining(c))
|
|
return ascii_str.lower()
|
|
|
|
|
|
def env_canonical(name):
|
|
"""Retourne la version canonique iTop ou None si inconnu."""
|
|
if not name:
|
|
return None
|
|
k = norm_key(name).replace(" ", "")
|
|
return ENV_ALIASES.get(k)
|
|
|
|
|
|
def domain_stem(s):
|
|
"""Stem domaine: lowercase ascii sans 's' final (Peages -> peage)."""
|
|
k = norm_key(s)
|
|
return re.sub(r"s$", "", k)
|
|
|
|
|
|
def cleanliness_score(name):
|
|
"""Plus le score est haut, plus le nom est 'propre' (keeper)."""
|
|
has_upper = any(c.isupper() for c in name)
|
|
has_accent = any(unicodedata.combining(c) for c in unicodedata.normalize("NFKD", name))
|
|
return (has_accent * 2 + has_upper, -len(name))
|
|
|
|
|
|
def merge_envs(conn, dry_run):
|
|
"""Fusionne envs vers canoniques iTop."""
|
|
rows = conn.execute(text("SELECT id, name FROM environments ORDER BY id")).fetchall()
|
|
print(f"\n=== ENVIRONMENTS ({len(rows)}) ===")
|
|
|
|
# Group: canonical_target -> [(id, name), ...]
|
|
groups = {}
|
|
leftovers = []
|
|
for r in rows:
|
|
canon = env_canonical(r.name)
|
|
if canon:
|
|
groups.setdefault(canon, []).append((r.id, r.name))
|
|
else:
|
|
leftovers.append((r.id, r.name))
|
|
|
|
merged = 0
|
|
for canon, items in groups.items():
|
|
if len(items) <= 1 and items[0][1] == canon:
|
|
continue
|
|
# Trouve le keeper (celui dont le name == canonical)
|
|
keeper = next((it for it in items if it[1] == canon), None)
|
|
if not keeper:
|
|
# Aucun n'a le nom exact: on renomme le 1er en canonique
|
|
keeper = items[0]
|
|
print(f" RENAME env id={keeper[0]} '{keeper[1]}' -> '{canon}'")
|
|
if not dry_run:
|
|
conn.execute(text("UPDATE environments SET name=:n WHERE id=:id"),
|
|
{"n": canon, "id": keeper[0]})
|
|
|
|
for dup_id, dup_name in items:
|
|
if dup_id == keeper[0]:
|
|
continue
|
|
cnt_de = conn.execute(text(
|
|
"SELECT COUNT(*) FROM domain_environments WHERE environment_id=:e"
|
|
), {"e": dup_id}).scalar()
|
|
cnt_srv = conn.execute(text(
|
|
"SELECT COUNT(*) FROM servers WHERE environnement=:n"
|
|
), {"n": dup_name}).scalar()
|
|
print(f" MERGE env '{dup_name}' (id={dup_id}, {cnt_de} assocs, "
|
|
f"{cnt_srv} servers) -> '{canon}'")
|
|
if not dry_run:
|
|
# Migre domain_environments (gere les conflits keeper/env)
|
|
assocs = conn.execute(text(
|
|
"SELECT id, domain_id FROM domain_environments WHERE environment_id=:e"
|
|
), {"e": dup_id}).fetchall()
|
|
for a in assocs:
|
|
existing = conn.execute(text(
|
|
"SELECT id FROM domain_environments "
|
|
"WHERE domain_id=:d AND environment_id=:e"
|
|
), {"d": a.domain_id, "e": keeper[0]}).fetchone()
|
|
if existing:
|
|
conn.execute(text(
|
|
"UPDATE servers SET domain_env_id=:new WHERE domain_env_id=:old"
|
|
), {"new": existing.id, "old": a.id})
|
|
conn.execute(text("DELETE FROM domain_environments WHERE id=:id"),
|
|
{"id": a.id})
|
|
else:
|
|
conn.execute(text(
|
|
"UPDATE domain_environments SET environment_id=:k WHERE id=:id"
|
|
), {"k": keeper[0], "id": a.id})
|
|
# Renomme servers.environnement plain-text
|
|
conn.execute(text(
|
|
"UPDATE servers SET environnement=:n WHERE environnement=:o"
|
|
), {"n": canon, "o": dup_name})
|
|
conn.execute(text("DELETE FROM environments WHERE id=:id"), {"id": dup_id})
|
|
merged += 1
|
|
|
|
if leftovers:
|
|
print(f" [LEFTOVERS non-iTop ({len(leftovers)})] : "
|
|
f"{[l[1] for l in leftovers]}")
|
|
print(" -> A traiter manuellement via /referentiel ou supprimer si vide")
|
|
print(f" Envs fusionnes: {merged}")
|
|
return merged
|
|
|
|
|
|
def merge_domains(conn, dry_run):
|
|
"""Fusionne domaines case/accent + stem (Peages -> Péage)."""
|
|
rows = conn.execute(text("SELECT id, name, code FROM domains ORDER BY id")).fetchall()
|
|
print(f"\n=== DOMAINS ({len(rows)}) ===")
|
|
|
|
groups = {}
|
|
for r in rows:
|
|
k = domain_stem(r.name)
|
|
groups.setdefault(k, []).append((r.id, r.name, r.code))
|
|
|
|
merged = 0
|
|
for k, items in groups.items():
|
|
if len(items) <= 1:
|
|
continue
|
|
items.sort(key=lambda x: cleanliness_score(x[1]), reverse=True)
|
|
keeper_id, keeper_name, keeper_code = items[0]
|
|
print(f"\n [GROUP stem='{k}'] keeper='{keeper_name}' (id={keeper_id})")
|
|
for dup_id, dup_name, dup_code in items[1:]:
|
|
cnt_de = conn.execute(text(
|
|
"SELECT COUNT(*) FROM domain_environments WHERE domain_id=:d"
|
|
), {"d": dup_id}).scalar()
|
|
print(f" MERGE '{dup_name}' (id={dup_id}, {cnt_de} assocs) -> '{keeper_name}'")
|
|
if not dry_run:
|
|
assocs = conn.execute(text(
|
|
"SELECT id, environment_id FROM domain_environments WHERE domain_id=:d"
|
|
), {"d": dup_id}).fetchall()
|
|
for a in assocs:
|
|
existing = conn.execute(text(
|
|
"SELECT id FROM domain_environments "
|
|
"WHERE domain_id=:k AND environment_id=:e"
|
|
), {"k": keeper_id, "e": a.environment_id}).fetchone()
|
|
if existing:
|
|
conn.execute(text(
|
|
"UPDATE servers SET domain_env_id=:new WHERE domain_env_id=:old"
|
|
), {"new": existing.id, "old": a.id})
|
|
conn.execute(text("DELETE FROM domain_environments WHERE id=:id"),
|
|
{"id": a.id})
|
|
else:
|
|
conn.execute(text(
|
|
"UPDATE domain_environments SET domain_id=:k WHERE id=:id"
|
|
), {"k": keeper_id, "id": a.id})
|
|
conn.execute(text("DELETE FROM domains WHERE id=:id"), {"id": dup_id})
|
|
merged += 1
|
|
|
|
print(f"\n Domaines fusionnes: {merged}")
|
|
return merged
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
engine = create_engine(DATABASE_URL)
|
|
print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}")
|
|
conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT")
|
|
|
|
n_env = merge_envs(conn, args.dry_run)
|
|
n_dom = merge_domains(conn, args.dry_run)
|
|
conn.close()
|
|
|
|
print(f"\n[DONE] {'(DRY) ' if args.dry_run else ''}envs: {n_env} | domains: {n_dom}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|