From 338c0ecc0f8d37903c0fb3277dea89f12f38e3e6 Mon Sep 17 00:00:00 2001 From: Admin MPCZ Date: Tue, 14 Apr 2026 13:24:42 +0200 Subject: [PATCH] Add SANEF asset CSV import script --- tools/import_sanef_assets.py | 159 +++++++++++++++++++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 tools/import_sanef_assets.py diff --git a/tools/import_sanef_assets.py b/tools/import_sanef_assets.py new file mode 100644 index 0000000..335dd03 --- /dev/null +++ b/tools/import_sanef_assets.py @@ -0,0 +1,159 @@ +"""Import SANEF assets CSV → table servers de PatchCenter. + +Usage: + python tools/import_sanef_assets.py [--truncate] + +Le CSV doit être séparé par des virgules, valeurs entre guillemets, +avec les colonnes telles qu'exportées d'iTop SANEF (Nom, Hostname, +Domaine, IP, Famille OS->Nom, Version OS->Nom, Lieu->Nom, etc.). +""" +import csv +import os +import sys +import argparse +from sqlalchemy import create_engine, text + +DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ + or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" + + +def norm_os_family(famille): + f = (famille or "").strip().lower() + if "windows" in f: + return "windows" + if "linux" in f or "oracle" in f: + return "linux" + return None + + +def norm_etat(status, etat): + s = (status or "").strip().lower() + e = (etat or "").strip().lower() + if "stock" in e: + return "stock" + if s == "recette" or e == "recette": + return "recette" + if "pr" in s and "prod" in s: + return "preprod" + if "veloppement" in s: + return "dev" + return "production" + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("csv_path") + parser.add_argument("--truncate", action="store_true") + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + engine = create_engine(DATABASE_URL) + print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}") + print(f"[INFO] CSV: {args.csv_path}") + + with open(args.csv_path, "r", encoding="utf-8-sig", newline="") as f: + reader = csv.DictReader(f) + rows = list(reader) + print(f"[INFO] {len(rows)} lignes lues dans le CSV") + + with engine.begin() as conn: + if args.truncate: + print("[INFO] Nettoyage des tables dépendantes + TRUNCATE servers") + if not args.dry_run: + for tbl in ("server_ips", "qualys_assets", "audits", + "session_servers", "planning_sessions", + "patch_history", "ref_correspondance"): + try: + conn.execute(text(f"DELETE FROM {tbl}")) + except Exception as e: + print(f" [WARN] DELETE {tbl}: {e}") + conn.execute(text("TRUNCATE servers RESTART IDENTITY CASCADE")) + + inserted = 0 + skipped = 0 + ip_map = [] + for r in rows: + hostname = (r.get("Nom") or r.get("Hostname") or "").strip() + if not hostname or hostname.startswith(("...", "#")): + skipped += 1 + continue + + os_family = norm_os_family(r.get("Famille OS->Nom") or r.get("Famille OS")) + os_version = (r.get("Version OS->Nom") or "").strip()[:200] or None + ip = (r.get("IP") or "").strip() or None + domain = (r.get("Domaine") or "").strip()[:50] or None + site = (r.get("Lieu->Nom") or "").strip()[:50] or None + etat = norm_etat(r.get("Status"), r.get("Etat")) + responsable = (r.get("Logiciel->Responsable Domaine DTS") or "").strip() or None + vcenter_vm = (r.get("vCluster / Hyperviseur->Nom") or "").strip()[:100] or None + desc = (r.get("Description") or "").strip() + fonction = (r.get("Fonction") or "").strip() + sar = (r.get("SAR description") or "").strip() + details = " | ".join(x for x in (fonction, desc, sar) if x)[:2000] or None + + fqdn = None + if domain and "." in domain and domain.lower() not in ("workgroup", "host"): + fqdn = f"{hostname}.{domain}"[:255] + + params = { + "hostname": hostname, + "fqdn": fqdn, + "domain_ltd": domain, + "os_family": os_family, + "os_version": os_version, + "machine_type": "vm", + "etat": etat, + "site": site, + "vcenter_vm_name": vcenter_vm, + "responsable_nom": responsable, + "patch_owner_details": details, + } + + if args.dry_run: + inserted += 1 + continue + + try: + conn.execute(text(""" + INSERT INTO servers + (hostname, fqdn, domain_ltd, os_family, os_version, machine_type, + etat, site, vcenter_vm_name, responsable_nom, patch_owner_details) + VALUES + (:hostname, :fqdn, :domain_ltd, :os_family, :os_version, :machine_type, + :etat, :site, :vcenter_vm_name, :responsable_nom, :patch_owner_details) + ON CONFLICT (hostname) DO UPDATE SET + fqdn = EXCLUDED.fqdn, + domain_ltd = EXCLUDED.domain_ltd, + os_family = EXCLUDED.os_family, + os_version = EXCLUDED.os_version, + etat = EXCLUDED.etat, + site = EXCLUDED.site, + vcenter_vm_name = EXCLUDED.vcenter_vm_name, + responsable_nom = EXCLUDED.responsable_nom, + patch_owner_details = EXCLUDED.patch_owner_details + """), params) + inserted += 1 + if ip: + ip_map.append((hostname, ip)) + except Exception as e: + print(f" [ERR] {hostname}: {e}") + skipped += 1 + + if not args.dry_run: + for hn, ip in ip_map: + try: + sid = conn.execute(text("SELECT id FROM servers WHERE hostname=:h"), + {"h": hn}).fetchone() + if sid: + conn.execute(text(""" + INSERT INTO server_ips (server_id, ip_address, is_primary) + VALUES (:sid, :ip, true) ON CONFLICT DO NOTHING + """), {"sid": sid.id, "ip": ip}) + except Exception: + pass + + print(f"[DONE] Inséré/mis à jour: {inserted} | Ignoré: {skipped}") + + +if __name__ == "__main__": + main()