diff --git a/tools/import_sanef_applications.py b/tools/import_sanef_applications.py new file mode 100644 index 0000000..9ea0be3 --- /dev/null +++ b/tools/import_sanef_applications.py @@ -0,0 +1,137 @@ +"""Import Solutions Applicatives depuis CSV iTop -> table applications. + +Champs essentiels (max utile pour le patching): + - nom_court <- Nom + - nom_complet <- Nom complet + - description <- Description + - editeur <- Editeur->Nom + - status <- Etat (Actif/Obsolete/...) + - criticite <- Criticité (basse/standard/haute/critique) + - responsable_dsi <- Responsable de service - DSI (nom + email) + - admin_technique <- Administrateur technique (nom + email) + +Ajoute colonnes responsable_dsi_*/admin_tech_* si absentes. + +Usage: + python tools/import_sanef_applications.py [--truncate] [--dry-run] +""" +import os +import csv +import argparse +from sqlalchemy import create_engine, text + +DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ + or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" + + +def clean(v): + if v is None: + return None + s = str(v).strip().replace("\xa0", " ") + return s or None + + +CRIT_MAP = { + "critique": "critique", "haute": "haute", + "standard": "standard", "basse": "basse", "faible": "basse", +} + + +def norm_crit(v): + if not v: + return "standard" + s = v.strip().lower() + return CRIT_MAP.get(s, "standard") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("csv_path") + parser.add_argument("--truncate", action="store_true") + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + engine = create_engine(DATABASE_URL) + print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}") + conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") + + # Colonnes supplementaires + conn.execute(text("ALTER TABLE applications ADD COLUMN IF NOT EXISTS responsable_dsi_nom varchar(200)")) + conn.execute(text("ALTER TABLE applications ADD COLUMN IF NOT EXISTS responsable_dsi_email varchar(255)")) + conn.execute(text("ALTER TABLE applications ADD COLUMN IF NOT EXISTS admin_tech_nom varchar(200)")) + conn.execute(text("ALTER TABLE applications ADD COLUMN IF NOT EXISTS admin_tech_email varchar(255)")) + conn.execute(text("ALTER TABLE applications ADD COLUMN IF NOT EXISTS etat varchar(30)")) + + if args.truncate and not args.dry_run: + conn.execute(text("TRUNCATE applications RESTART IDENTITY CASCADE")) + print("[INFO] applications vidée") + + with open(args.csv_path, "r", encoding="utf-8-sig", newline="") as f: + sample = f.read(4096); f.seek(0) + delim = ";" if sample.count(";") > sample.count(",") else "," + rows = list(csv.DictReader(f, delimiter=delim)) + print(f"[INFO] {len(rows)} lignes (delim={delim!r})") + + stats = {"inserted": 0, "updated": 0, "skipped": 0} + + for r in rows: + nom = clean(r.get("Nom")) + if not nom: + stats["skipped"] += 1 + continue + + vals = { + "nom_court": nom[:50], + "nom_complet": clean(r.get("Nom complet") or nom)[:200] if clean(r.get("Nom complet") or nom) else None, + "description": clean(r.get("Description")), + "editeur": (clean(r.get("Editeur->Nom")) or "")[:100] or None, + "criticite": norm_crit(r.get("Criticité")), + "etat": clean(r.get("Etat"))[:30] if clean(r.get("Etat")) else None, + "resp_nom": clean(r.get("Responsable de service - DSI->Nom complet") + or r.get("Responsable de service - DSI->Nom")), + "resp_email": clean(r.get("Responsable de service - DSI->Email")), + "admin_nom": clean(r.get("Administrateur technique->Nom complet") + or r.get("Administrateur technique->Nom")), + "admin_email": clean(r.get("Administrateur technique->Email")), + } + + if args.dry_run: + print(f" DRY: {vals['nom_court']:30s} [{vals['etat'] or '-'}] " + f"resp={vals['resp_nom'] or '-'}") + continue + + try: + existing = conn.execute(text("SELECT id FROM applications WHERE nom_court=:n"), + {"n": vals["nom_court"]}).fetchone() + if existing: + conn.execute(text(""" + UPDATE applications SET + nom_complet=:nom_complet, description=:description, + editeur=:editeur, criticite=:criticite, etat=:etat, + responsable_dsi_nom=:resp_nom, responsable_dsi_email=:resp_email, + admin_tech_nom=:admin_nom, admin_tech_email=:admin_email, + updated_at=now() + WHERE id=:id + """), {**vals, "id": existing.id}) + stats["updated"] += 1 + else: + conn.execute(text(""" + INSERT INTO applications + (nom_court, nom_complet, description, editeur, criticite, etat, + responsable_dsi_nom, responsable_dsi_email, + admin_tech_nom, admin_tech_email) + VALUES (:nom_court, :nom_complet, :description, :editeur, + :criticite, :etat, + :resp_nom, :resp_email, :admin_nom, :admin_email) + """), vals) + stats["inserted"] += 1 + except Exception as e: + print(f" [ERR] {vals['nom_court']}: {str(e)[:120]}") + stats["skipped"] += 1 + + conn.close() + print(f"\n[DONE] Inserts: {stats['inserted']} | Updates: {stats['updated']} | Skip: {stats['skipped']}") + + +if __name__ == "__main__": + main()