patchcenter/tools/import_sanef_assets.py

161 lines
6.0 KiB
Python

"""Import SANEF assets CSV → table servers de PatchCenter.
Usage:
python tools/import_sanef_assets.py <csv_path> [--truncate]
Le CSV doit être séparé par des virgules, valeurs entre guillemets,
avec les colonnes telles qu'exportées d'iTop SANEF (Nom, Hostname,
Domaine, IP, Famille OS->Nom, Version OS->Nom, Lieu->Nom, etc.).
"""
import csv
import os
import sys
import argparse
from sqlalchemy import create_engine, text
DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo"
def norm_os_family(famille):
f = (famille or "").strip().lower()
if "windows" in f:
return "windows"
if "linux" in f or "oracle" in f:
return "linux"
return None
def norm_etat(status, etat):
"""Mapper vers les valeurs autorisées par le CHECK constraint:
production, implementation, stock, obsolete, eol."""
e = (etat or "").strip().lower()
if "stock" in e:
return "stock"
if "implémentation" in e or "implementation" in e:
return "implementation"
if "obsol" in e:
return "obsolete"
if "eol" in e:
return "eol"
return "production"
def main():
parser = argparse.ArgumentParser()
parser.add_argument("csv_path")
parser.add_argument("--truncate", action="store_true")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}")
print(f"[INFO] CSV: {args.csv_path}")
with open(args.csv_path, "r", encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(f)
rows = list(reader)
print(f"[INFO] {len(rows)} lignes lues dans le CSV")
with engine.begin() as conn:
if args.truncate:
print("[INFO] Nettoyage des tables référençant servers")
if not args.dry_run:
fk_tables = conn.execute(text("""
SELECT DISTINCT tc.table_name
FROM information_schema.table_constraints tc
JOIN information_schema.constraint_column_usage ccu
ON tc.constraint_name = ccu.constraint_name
WHERE tc.constraint_type = 'FOREIGN KEY'
AND ccu.table_name = 'servers'
AND tc.table_name != 'servers'
""")).fetchall()
for (tbl,) in fk_tables:
conn.execute(text(f"DELETE FROM {tbl}"))
print(f" [OK] cleared {tbl}")
conn.execute(text("TRUNCATE servers RESTART IDENTITY CASCADE"))
print(" [OK] servers truncated")
inserted = 0
skipped = 0
ip_map = []
for r in rows:
hostname = (r.get("Nom") or r.get("Hostname") or "").strip()
if not hostname or hostname.startswith(("...", "#")):
skipped += 1
continue
os_family = norm_os_family(r.get("Famille OS->Nom") or r.get("Famille OS"))
os_version = (r.get("Version OS->Nom") or "").strip()[:200] or None
ip = (r.get("IP") or "").strip() or None
domain = (r.get("Domaine") or "").strip()[:50] or None
site = (r.get("Lieu->Nom") or "").strip()[:50] or None
etat = norm_etat(r.get("Status"), r.get("Etat"))
responsable = (r.get("Logiciel->Responsable Domaine DTS") or "").strip() or None
vcenter_vm = (r.get("vCluster / Hyperviseur->Nom") or "").strip()[:100] or None
desc = (r.get("Description") or "").strip()
fonction = (r.get("Fonction") or "").strip()
sar = (r.get("SAR description") or "").strip()
details = " | ".join(x for x in (fonction, desc, sar) if x)[:2000] or None
fqdn = None
if domain and "." in domain and domain.lower() not in ("workgroup", "host"):
fqdn = f"{hostname}.{domain}"[:255]
params = {
"hostname": hostname,
"fqdn": fqdn,
"domain_ltd": domain,
"os_family": os_family,
"os_version": os_version,
"machine_type": "vm",
"etat": etat,
"site": site,
"vcenter_vm_name": vcenter_vm,
"responsable_nom": responsable,
"patch_owner_details": details,
}
if args.dry_run:
inserted += 1
continue
try:
sp = conn.begin_nested()
conn.execute(text("""
INSERT INTO servers
(hostname, fqdn, domain_ltd, os_family, os_version, machine_type,
etat, site, vcenter_vm_name, responsable_nom, patch_owner_details)
VALUES
(:hostname, :fqdn, :domain_ltd, :os_family, :os_version, :machine_type,
:etat, :site, :vcenter_vm_name, :responsable_nom, :patch_owner_details)
"""), params)
sp.commit()
inserted += 1
if ip:
ip_map.append((hostname, ip))
except Exception as e:
try: sp.rollback()
except Exception: pass
print(f" [ERR] {hostname}: {str(e)[:200]}")
skipped += 1
if not args.dry_run:
for hn, ip in ip_map:
try:
sid = conn.execute(text("SELECT id FROM servers WHERE hostname=:h"),
{"h": hn}).fetchone()
if sid:
conn.execute(text("""
INSERT INTO server_ips (server_id, ip_address, is_primary)
VALUES (:sid, :ip, true) ON CONFLICT DO NOTHING
"""), {"sid": sid.id, "ip": ip})
except Exception:
pass
print(f"[DONE] Inséré/mis à jour: {inserted} | Ignoré: {skipped}")
if __name__ == "__main__":
main()