patchcenter/tools/import_sanef_ips.py

113 lines
3.8 KiB
Python

"""Import des IPs SANEF (export iTop 'Interface réseau') → table server_ips.
Usage:
python tools/import_sanef_ips.py <csv_path>
"""
import csv
import os
import argparse
from sqlalchemy import create_engine, text
DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo"
def main():
parser = argparse.ArgumentParser()
parser.add_argument("csv_path")
parser.add_argument("--replace", action="store_true",
help="Vider server_ips avant l'import")
args = parser.parse_args()
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}")
print(f"[INFO] CSV: {args.csv_path}")
with open(args.csv_path, "r", encoding="utf-8-sig", newline="") as f:
sample = f.read(4096); f.seek(0)
delim = ";" if sample.count(";") > sample.count(",") else ","
reader = csv.reader(f, delimiter=delim)
header = next(reader)
rows = list(reader)
print(f"[INFO] {len(rows)} lignes (delim={delim!r})")
print(f"[INFO] Colonnes: {header[:12]}...")
def find_idx(candidates):
"""Retourne l'indice de la PREMIERE colonne matchant."""
for i, h in enumerate(header):
if any(cand.lower() == h.lower() for cand in candidates):
return i
return -1
idx_host = find_idx(["Machine virtuelle->Nom", "Machine virtuelle",
"Serveur->Nom", "Serveur",
"Système->Nom", "Nom"])
idx_ip = find_idx(["Adresse IP", "IP"])
idx_gw = find_idx(["Passerelle"])
idx_mask = find_idx(["Masque de sous réseau"])
idx_vrf = find_idx(["VRF"])
print(f"[INFO] Indices: host={idx_host} ip={idx_ip} gw={idx_gw} mask={idx_mask} vrf={idx_vrf}")
if idx_host == -1 or idx_ip == -1:
print("[ERR] Colonne hostname ou IP introuvable")
return
conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT")
if args.replace:
conn.execute(text("DELETE FROM server_ips"))
print("[OK] server_ips vidé")
linked = 0
missing = 0
skipped = 0
for row in rows:
def cell(i):
return row[i].strip() if 0 <= i < len(row) else ""
vm_name = cell(idx_host)
if vm_name:
vm_name = vm_name.split(".")[0].lower()
ip = cell(idx_ip)
gw = cell(idx_gw) or None
mask = cell(idx_mask) or None
vrf = cell(idx_vrf) or None
if not vm_name or not ip:
skipped += 1
continue
srv = conn.execute(text("SELECT id FROM servers WHERE hostname=:h"),
{"h": vm_name}).fetchone()
if not srv:
missing += 1
continue
try:
existing = conn.execute(text(
"SELECT id FROM server_ips WHERE server_id=:sid AND ip_address=CAST(:ip AS inet)"
), {"sid": srv.id, "ip": ip}).fetchone()
if existing:
skipped += 1
continue
desc_parts = []
if vrf: desc_parts.append(f"VRF={vrf}")
if gw: desc_parts.append(f"GW={gw}")
if mask: desc_parts.append(f"MASK={mask}")
desc = " ".join(desc_parts)[:200] or None
conn.execute(text("""
INSERT INTO server_ips (server_id, ip_address, ip_type, description)
VALUES (:sid, CAST(:ip AS inet), 'primary', :d)
"""), {"sid": srv.id, "ip": ip, "d": desc})
linked += 1
except Exception as e:
print(f" [ERR] {vm_name} {ip}: {str(e)[:150]}")
skipped += 1
conn.close()
print(f"[DONE] Liés: {linked} | Hostname introuvable: {missing} | Ignoré: {skipped}")
if __name__ == "__main__":
main()