From e2f61af818ab9088fdfa8f143e5a53169cff09a9 Mon Sep 17 00:00:00 2001 From: Admin MPCZ Date: Tue, 14 Apr 2026 19:47:37 +0200 Subject: [PATCH] Add import_ips_from_assets: lit colonne IP directe des CSV VM/physical MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Contourne l'export Interface Réseau (souvent incomplet). Utilise la colonne IP directe du CSV VMs (attribut iTop sur la VM) + VRF si present. Gère plusieurs IPs separees par virgule. --- tools/import_ips_from_assets.py | 106 ++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 tools/import_ips_from_assets.py diff --git a/tools/import_ips_from_assets.py b/tools/import_ips_from_assets.py new file mode 100644 index 0000000..afc0adf --- /dev/null +++ b/tools/import_ips_from_assets.py @@ -0,0 +1,106 @@ +"""Import IPs depuis la colonne IP du CSV iTop VMs/Physical -> server_ips. + +Plus simple que Interface Réseau: l'IP principale est attribut direct +sur la VM dans iTop (colonne 'IP' du CSV export). + +Usage: + python tools/import_ips_from_assets.py [ ...] [--truncate] [--dry-run] +""" +import os +import csv +import argparse +import re +from sqlalchemy import create_engine, text + +DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ + or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" + +IP_REGEX = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("csv_paths", nargs="+") + parser.add_argument("--truncate", action="store_true") + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + engine = create_engine(DATABASE_URL) + print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}") + conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") + + if args.truncate and not args.dry_run: + conn.execute(text("TRUNCATE server_ips RESTART IDENTITY CASCADE")) + print("[INFO] server_ips vidée") + + stats = {"inserted": 0, "updated": 0, "no_server": 0, "skipped": 0, "bad_ip": 0} + + for path in args.csv_paths: + print(f"\n[INFO] Lecture {path}") + with open(path, "r", encoding="utf-8-sig", newline="") as f: + sample = f.read(4096); f.seek(0) + delim = ";" if sample.count(";") > sample.count(",") else "," + rows = list(csv.DictReader(f, delimiter=delim)) + print(f"[INFO] {len(rows)} lignes") + + for r in rows: + hostname = (r.get("Nom") or r.get("Hostname") or "").strip() + if not hostname or not any(c.isalpha() for c in hostname): + stats["skipped"] += 1 + continue + hostname = hostname.split(".")[0].lower() + + ip = (r.get("IP") or "").strip() + if not ip: + stats["skipped"] += 1 + continue + # Gere multiples IPs separees par virgule/espace + candidates = re.split(r"[,\s;]+", ip) + ips_valid = [c for c in candidates if IP_REGEX.match(c)] + if not ips_valid: + stats["bad_ip"] += 1 + continue + + vrf = (r.get("VRF") or "").strip()[:50] or None + + srv = conn.execute(text("SELECT id FROM servers WHERE hostname=:h"), + {"h": hostname}).fetchone() + if not srv: + stats["no_server"] += 1 + continue + + for idx, ip_val in enumerate(ips_valid): + is_primary = (idx == 0) + if args.dry_run: + print(f" DRY: {hostname:20s} {ip_val} {'(primary)' if is_primary else ''}") + stats["inserted"] += 1 + continue + try: + # Check existant + existing = conn.execute(text( + "SELECT id FROM server_ips WHERE server_id=:sid AND ip_address=:ip::inet" + ), {"sid": srv.id, "ip": ip_val}).fetchone() + if existing: + if vrf: + conn.execute(text( + "UPDATE server_ips SET vrf=:v, is_primary=:p WHERE id=:id" + ), {"v": vrf, "p": is_primary, "id": existing.id}) + stats["updated"] += 1 + else: + conn.execute(text(""" + INSERT INTO server_ips (server_id, ip_address, is_primary, vrf) + VALUES (:sid, :ip::inet, :p, :v) + """), {"sid": srv.id, "ip": ip_val, "p": is_primary, "v": vrf}) + stats["inserted"] += 1 + except Exception as e: + print(f" [ERR] {hostname} {ip_val}: {str(e)[:120]}") + stats["skipped"] += 1 + + conn.close() + print(f"\n[DONE] Inserts: {stats['inserted']} | Updates: {stats['updated']} " + f"| Sans serveur: {stats['no_server']} | IP invalide: {stats['bad_ip']} " + f"| Skip: {stats['skipped']}") + + +if __name__ == "__main__": + main()