diff --git a/tools/cleanup_domains.py b/tools/cleanup_domains.py new file mode 100644 index 0000000..158245b --- /dev/null +++ b/tools/cleanup_domains.py @@ -0,0 +1,95 @@ +"""Dedoublonne la table domains (case/accent-insensitive). + +Ex: 'Flux Libre'/'flux libre' -> 'Flux Libre' + 'Péage'/'peage'/'PeagE' -> 'Péage' + +Garde la forme la plus 'propre' (avec accents + capitale), migre les FK +domain_environments vers le keeper, supprime les doublons. + +Usage: + python tools/cleanup_domains.py [--dry-run] +""" +import os +import argparse +import unicodedata +from sqlalchemy import create_engine, text + +DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ + or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" + + +def norm_key(s): + if not s: + return "" + nfkd = unicodedata.normalize("NFKD", s.strip()) + ascii_str = "".join(c for c in nfkd if not unicodedata.combining(c)) + return ascii_str.lower() + + +def cleanliness_score(name): + """Plus le score est haut, plus le nom est 'propre' (keeper).""" + has_upper = any(c.isupper() for c in name) + has_accent = any(unicodedata.combining(c) for c in unicodedata.normalize("NFKD", name)) + return (has_upper + has_accent * 2, -len(name)) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + engine = create_engine(DATABASE_URL) + print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}") + conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") + + rows = conn.execute(text("SELECT id, name, code FROM domains ORDER BY id")).fetchall() + groups = {} + for r in rows: + k = norm_key(r.name) + groups.setdefault(k, []).append((r.id, r.name, r.code)) + + merged = 0 + for k, items in groups.items(): + if len(items) <= 1: + continue + # Trier: score decroissant (le plus propre en 1er) + items.sort(key=lambda x: cleanliness_score(x[1]), reverse=True) + keeper_id, keeper_name, keeper_code = items[0] + print(f"\n[GROUP] key='{k}' -> garde '{keeper_name}' (id={keeper_id})") + for dup_id, dup_name, dup_code in items[1:]: + cnt = conn.execute(text( + "SELECT COUNT(*) FROM domain_environments WHERE domain_id=:d" + ), {"d": dup_id}).scalar() + print(f" MERGE: '{dup_name}' (id={dup_id}, {cnt} assocs) -> '{keeper_name}'") + if not args.dry_run: + # Migre les domain_environments vers le keeper (en evitant conflits) + assocs = conn.execute(text( + "SELECT id, environment_id FROM domain_environments WHERE domain_id=:d" + ), {"d": dup_id}).fetchall() + for a in assocs: + existing = conn.execute(text( + "SELECT id FROM domain_environments " + "WHERE domain_id=:k AND environment_id=:e" + ), {"k": keeper_id, "e": a.environment_id}).fetchone() + if existing: + # La paire (keeper, env) existe deja : on reattache les servers + conn.execute(text( + "UPDATE servers SET domain_env_id=:new WHERE domain_env_id=:old" + ), {"new": existing.id, "old": a.id}) + conn.execute(text("DELETE FROM domain_environments WHERE id=:id"), + {"id": a.id}) + else: + # On bascule le domain_id + conn.execute(text( + "UPDATE domain_environments SET domain_id=:k WHERE id=:id" + ), {"k": keeper_id, "id": a.id}) + # Supprime le domain doublon + conn.execute(text("DELETE FROM domains WHERE id=:id"), {"id": dup_id}) + merged += 1 + + conn.close() + print(f"\n[DONE] {'(DRY) ' if args.dry_run else ''}Doublons fusionnes: {merged}") + + +if __name__ == "__main__": + main() diff --git a/tools/fill_ssh_method_by_default.py b/tools/fill_ssh_method_by_default.py index b63b7bb..b22af86 100644 --- a/tools/fill_ssh_method_by_default.py +++ b/tools/fill_ssh_method_by_default.py @@ -40,6 +40,15 @@ def main(): print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}") conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") + # Etend le CHECK constraint pour accepter les nouveaux modes SANEF + print("[INFO] Drop ancien CHECK ssh_method...") + conn.execute(text("ALTER TABLE servers DROP CONSTRAINT IF EXISTS servers_ssh_method_check")) + conn.execute(text( + "ALTER TABLE servers ADD CONSTRAINT servers_ssh_method_check " + "CHECK (ssh_method IN ('ssh_key', 'ssh_psmp', 'ssh_password', " + "'rdp_local', 'rdp_psmp', 'winrm', 'a_definir') OR ssh_method IS NULL)" + )) + where = "" if not args.overwrite: where = "AND (ssh_method IS NULL OR ssh_method = '' OR ssh_method = 'a_definir' OR ssh_method = 'ssh_key')"