"""Import IPs depuis la colonne IP du CSV iTop VMs/Physical -> server_ips. Plus simple que Interface Réseau: l'IP principale est attribut direct sur la VM dans iTop (colonne 'IP' du CSV export). Usage: python tools/import_ips_from_assets.py [ ...] [--truncate] [--dry-run] """ import os import csv import argparse import re from sqlalchemy import create_engine, text DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" IP_REGEX = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$") def main(): parser = argparse.ArgumentParser() parser.add_argument("csv_paths", nargs="+") parser.add_argument("--truncate", action="store_true") parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() engine = create_engine(DATABASE_URL) print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}") conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") if args.truncate and not args.dry_run: conn.execute(text("TRUNCATE server_ips RESTART IDENTITY CASCADE")) print("[INFO] server_ips vidée") stats = {"inserted": 0, "updated": 0, "no_server": 0, "skipped": 0, "bad_ip": 0} for path in args.csv_paths: print(f"\n[INFO] Lecture {path}") with open(path, "r", encoding="utf-8-sig", newline="") as f: sample = f.read(4096); f.seek(0) delim = ";" if sample.count(";") > sample.count(",") else "," rows = list(csv.DictReader(f, delimiter=delim)) print(f"[INFO] {len(rows)} lignes") for r in rows: hostname = (r.get("Nom") or r.get("Hostname") or "").strip() if not hostname or not any(c.isalpha() for c in hostname): stats["skipped"] += 1 continue hostname = hostname.split(".")[0].lower() ip = (r.get("IP") or "").strip() if not ip: stats["skipped"] += 1 continue # Gere multiples IPs separees par virgule/espace candidates = re.split(r"[,\s;]+", ip) ips_valid = [c for c in candidates if IP_REGEX.match(c)] if not ips_valid: stats["bad_ip"] += 1 continue vrf = (r.get("VRF") or "").strip()[:50] or None srv = conn.execute(text("SELECT id FROM servers WHERE hostname=:h"), {"h": hostname}).fetchone() if not srv: stats["no_server"] += 1 continue for idx, ip_val in enumerate(ips_valid): ip_type = "primary" if idx == 0 else "secondary" desc = f"VRF={vrf}" if vrf else None if args.dry_run: print(f" DRY: {hostname:20s} {ip_val} [{ip_type}]") stats["inserted"] += 1 continue try: existing = conn.execute(text( "SELECT id FROM server_ips WHERE server_id=:sid AND ip_address=CAST(:ip AS inet)" ), {"sid": srv.id, "ip": ip_val}).fetchone() if existing: conn.execute(text( "UPDATE server_ips SET ip_type=:t, description=:d WHERE id=:id" ), {"t": ip_type, "d": desc, "id": existing.id}) stats["updated"] += 1 else: conn.execute(text(""" INSERT INTO server_ips (server_id, ip_address, ip_type, description) VALUES (:sid, CAST(:ip AS inet), :t, :d) """), {"sid": srv.id, "ip": ip_val, "t": ip_type, "d": desc}) stats["inserted"] += 1 except Exception as e: print(f" [ERR] {hostname} {ip_val}: {str(e)[:120]}") stats["skipped"] += 1 conn.close() print(f"\n[DONE] Inserts: {stats['inserted']} | Updates: {stats['updated']} " f"| Sans serveur: {stats['no_server']} | IP invalide: {stats['bad_ip']} " f"| Skip: {stats['skipped']}") if __name__ == "__main__": main()