patchcenter/tools/import_ips_from_assets.py

106 lines
4.2 KiB
Python

"""Import IPs depuis la colonne IP du CSV iTop VMs/Physical -> server_ips.
Plus simple que Interface Réseau: l'IP principale est attribut direct
sur la VM dans iTop (colonne 'IP' du CSV export).
Usage:
python tools/import_ips_from_assets.py <csv1> [<csv2> ...] [--truncate] [--dry-run]
"""
import os
import csv
import argparse
import re
from sqlalchemy import create_engine, text
DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo"
IP_REGEX = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("csv_paths", nargs="+")
parser.add_argument("--truncate", action="store_true")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}")
conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT")
if args.truncate and not args.dry_run:
conn.execute(text("TRUNCATE server_ips RESTART IDENTITY CASCADE"))
print("[INFO] server_ips vidée")
stats = {"inserted": 0, "updated": 0, "no_server": 0, "skipped": 0, "bad_ip": 0}
for path in args.csv_paths:
print(f"\n[INFO] Lecture {path}")
with open(path, "r", encoding="utf-8-sig", newline="") as f:
sample = f.read(4096); f.seek(0)
delim = ";" if sample.count(";") > sample.count(",") else ","
rows = list(csv.DictReader(f, delimiter=delim))
print(f"[INFO] {len(rows)} lignes")
for r in rows:
hostname = (r.get("Nom") or r.get("Hostname") or "").strip()
if not hostname or not any(c.isalpha() for c in hostname):
stats["skipped"] += 1
continue
hostname = hostname.split(".")[0].lower()
ip = (r.get("IP") or "").strip()
if not ip:
stats["skipped"] += 1
continue
# Gere multiples IPs separees par virgule/espace
candidates = re.split(r"[,\s;]+", ip)
ips_valid = [c for c in candidates if IP_REGEX.match(c)]
if not ips_valid:
stats["bad_ip"] += 1
continue
vrf = (r.get("VRF") or "").strip()[:50] or None
srv = conn.execute(text("SELECT id FROM servers WHERE hostname=:h"),
{"h": hostname}).fetchone()
if not srv:
stats["no_server"] += 1
continue
for idx, ip_val in enumerate(ips_valid):
ip_type = "primary" if idx == 0 else "secondary"
desc = f"VRF={vrf}" if vrf else None
if args.dry_run:
print(f" DRY: {hostname:20s} {ip_val} [{ip_type}]")
stats["inserted"] += 1
continue
try:
existing = conn.execute(text(
"SELECT id FROM server_ips WHERE server_id=:sid AND ip_address=CAST(:ip AS inet)"
), {"sid": srv.id, "ip": ip_val}).fetchone()
if existing:
conn.execute(text(
"UPDATE server_ips SET ip_type=:t, description=:d WHERE id=:id"
), {"t": ip_type, "d": desc, "id": existing.id})
stats["updated"] += 1
else:
conn.execute(text("""
INSERT INTO server_ips (server_id, ip_address, ip_type, description)
VALUES (:sid, CAST(:ip AS inet), :t, :d)
"""), {"sid": srv.id, "ip": ip_val, "t": ip_type, "d": desc})
stats["inserted"] += 1
except Exception as e:
print(f" [ERR] {hostname} {ip_val}: {str(e)[:120]}")
stats["skipped"] += 1
conn.close()
print(f"\n[DONE] Inserts: {stats['inserted']} | Updates: {stats['updated']} "
f"| Sans serveur: {stats['no_server']} | IP invalide: {stats['bad_ip']} "
f"| Skip: {stats['skipped']}")
if __name__ == "__main__":
main()