"""Etend le CHECK constraint etat + re-map depuis les CSV iTop. Ajoute les etats 'condition physique' iTop manquants puis relit les CSV (Server, VirtualMachine, Hyperviseur, Serveur physique) pour mettre a jour uniquement la colonne etat des serveurs existants. Usage: python tools/fix_etat_extend.py [ ...] [--dry-run] """ import os import csv import argparse import unicodedata from sqlalchemy import create_engine, text DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" # Tous les etats iTop -> valeur normalisee ETAT_MAP = { "production": "production", "implémentation": "implementation", "implementation": "implementation", "stock": "stock", "obsolète": "obsolete", "obsolete": "obsolete", "eol": "eol", "a récupérer": "a_recuperer", "a recuperer": "a_recuperer", "à récupérer": "a_recuperer", "cassé": "casse", "casse": "casse", "cédé": "cede", "cede": "cede", "en panne": "en_panne", "nouveau": "nouveau", "perdu": "perdu", "recyclé": "recycle", "recycle": "recycle", "occasion": "occasion", "a détruire": "a_detruire", "à détruire": "a_detruire", "a detruire": "a_detruire", "volé": "vole", "vole": "vole", } ALLOWED = sorted(set(ETAT_MAP.values())) def strip_accents(s): return "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn") def norm_etat(raw): if not raw: return None key = raw.strip().lower() if key in ("-", "(null)", ""): return None if key in ETAT_MAP: return ETAT_MAP[key] # fallback: sans accent no_acc = strip_accents(key) if no_acc in ETAT_MAP: return ETAT_MAP[no_acc] return None def main(): parser = argparse.ArgumentParser() parser.add_argument("csv_paths", nargs="+") parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() engine = create_engine(DATABASE_URL) print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}") conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") # 1. Etendre le CHECK constraint print("[INFO] Etend le CHECK constraint etat...") conn.execute(text("ALTER TABLE servers DROP CONSTRAINT IF EXISTS servers_etat_check")) allowed_sql = ", ".join(f"'{v}'" for v in ALLOWED) conn.execute(text(f"ALTER TABLE servers ADD CONSTRAINT servers_etat_check CHECK (etat IN ({allowed_sql}) OR etat IS NULL)")) print(f"[INFO] Etats autorises: {ALLOWED}") # 2. Relire CSV et mettre a jour updated = 0 unchanged = 0 not_found = 0 unknown_etats = set() for csv_path in args.csv_paths: print(f"\n[INFO] Lecture {csv_path}") with open(csv_path, "r", encoding="utf-8-sig", newline="") as f: reader = csv.DictReader(f) rows = list(reader) print(f"[INFO] {len(rows)} lignes") for r in rows: hostname = (r.get("Nom") or r.get("Hostname") or "").strip() if not hostname or not any(c.isalpha() for c in hostname): continue raw_etat = r.get("Etat") or r.get("État") or r.get("Status") new_etat = norm_etat(raw_etat) if new_etat is None and raw_etat and raw_etat.strip() not in ("-", "(null)", ""): unknown_etats.add(raw_etat.strip()) continue if new_etat is None: continue srv = conn.execute(text("SELECT id, etat FROM servers WHERE hostname=:h"), {"h": hostname}).fetchone() if not srv: not_found += 1 continue if srv.etat == new_etat: unchanged += 1 continue if args.dry_run: print(f" DRY: {hostname} {srv.etat} -> {new_etat}") else: conn.execute(text("UPDATE servers SET etat=:e WHERE id=:sid"), {"e": new_etat, "sid": srv.id}) updated += 1 conn.close() print(f"\n[DONE] Maj: {updated} | Inchanges: {unchanged} | Hors base: {not_found}") if unknown_etats: print(f"[WARN] Etats non mappes rencontres: {unknown_etats}") if __name__ == "__main__": main()