"""Import SANEF assets CSV → table servers de PatchCenter. Usage: python tools/import_sanef_assets.py [--truncate] Le CSV doit être séparé par des virgules, valeurs entre guillemets, avec les colonnes telles qu'exportées d'iTop SANEF (Nom, Hostname, Domaine, IP, Famille OS->Nom, Version OS->Nom, Lieu->Nom, etc.). """ import csv import os import sys import argparse from sqlalchemy import create_engine, text DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" def norm_os_family(famille): f = (famille or "").strip().lower() if "windows" in f: return "windows" if "linux" in f or "oracle" in f: return "linux" return None def norm_etat(status, etat): s = (status or "").strip().lower() e = (etat or "").strip().lower() if "stock" in e: return "stock" if s == "recette" or e == "recette": return "recette" if "pr" in s and "prod" in s: return "preprod" if "veloppement" in s: return "dev" return "production" def main(): parser = argparse.ArgumentParser() parser.add_argument("csv_path") parser.add_argument("--truncate", action="store_true") parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() engine = create_engine(DATABASE_URL) print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}") print(f"[INFO] CSV: {args.csv_path}") with open(args.csv_path, "r", encoding="utf-8-sig", newline="") as f: reader = csv.DictReader(f) rows = list(reader) print(f"[INFO] {len(rows)} lignes lues dans le CSV") with engine.begin() as conn: if args.truncate: print("[INFO] Nettoyage des tables dépendantes + TRUNCATE servers") if not args.dry_run: for tbl in ("server_ips", "qualys_assets", "audits", "session_servers", "planning_sessions", "patch_history", "ref_correspondance"): try: conn.execute(text(f"DELETE FROM {tbl}")) except Exception as e: print(f" [WARN] DELETE {tbl}: {e}") conn.execute(text("TRUNCATE servers RESTART IDENTITY CASCADE")) inserted = 0 skipped = 0 ip_map = [] for r in rows: hostname = (r.get("Nom") or r.get("Hostname") or "").strip() if not hostname or hostname.startswith(("...", "#")): skipped += 1 continue os_family = norm_os_family(r.get("Famille OS->Nom") or r.get("Famille OS")) os_version = (r.get("Version OS->Nom") or "").strip()[:200] or None ip = (r.get("IP") or "").strip() or None domain = (r.get("Domaine") or "").strip()[:50] or None site = (r.get("Lieu->Nom") or "").strip()[:50] or None etat = norm_etat(r.get("Status"), r.get("Etat")) responsable = (r.get("Logiciel->Responsable Domaine DTS") or "").strip() or None vcenter_vm = (r.get("vCluster / Hyperviseur->Nom") or "").strip()[:100] or None desc = (r.get("Description") or "").strip() fonction = (r.get("Fonction") or "").strip() sar = (r.get("SAR description") or "").strip() details = " | ".join(x for x in (fonction, desc, sar) if x)[:2000] or None fqdn = None if domain and "." in domain and domain.lower() not in ("workgroup", "host"): fqdn = f"{hostname}.{domain}"[:255] params = { "hostname": hostname, "fqdn": fqdn, "domain_ltd": domain, "os_family": os_family, "os_version": os_version, "machine_type": "vm", "etat": etat, "site": site, "vcenter_vm_name": vcenter_vm, "responsable_nom": responsable, "patch_owner_details": details, } if args.dry_run: inserted += 1 continue try: conn.execute(text(""" INSERT INTO servers (hostname, fqdn, domain_ltd, os_family, os_version, machine_type, etat, site, vcenter_vm_name, responsable_nom, patch_owner_details) VALUES (:hostname, :fqdn, :domain_ltd, :os_family, :os_version, :machine_type, :etat, :site, :vcenter_vm_name, :responsable_nom, :patch_owner_details) ON CONFLICT (hostname) DO UPDATE SET fqdn = EXCLUDED.fqdn, domain_ltd = EXCLUDED.domain_ltd, os_family = EXCLUDED.os_family, os_version = EXCLUDED.os_version, etat = EXCLUDED.etat, site = EXCLUDED.site, vcenter_vm_name = EXCLUDED.vcenter_vm_name, responsable_nom = EXCLUDED.responsable_nom, patch_owner_details = EXCLUDED.patch_owner_details """), params) inserted += 1 if ip: ip_map.append((hostname, ip)) except Exception as e: print(f" [ERR] {hostname}: {e}") skipped += 1 if not args.dry_run: for hn, ip in ip_map: try: sid = conn.execute(text("SELECT id FROM servers WHERE hostname=:h"), {"h": hn}).fetchone() if sid: conn.execute(text(""" INSERT INTO server_ips (server_id, ip_address, is_primary) VALUES (:sid, :ip, true) ON CONFLICT DO NOTHING """), {"sid": sid.id, "ip": ip}) except Exception: pass print(f"[DONE] Inséré/mis à jour: {inserted} | Ignoré: {skipped}") if __name__ == "__main__": main()