113 lines
3.8 KiB
Python
113 lines
3.8 KiB
Python
"""Import des IPs SANEF (export iTop 'Interface réseau') → table server_ips.
|
|
|
|
Usage:
|
|
python tools/import_sanef_ips.py <csv_path>
|
|
"""
|
|
import csv
|
|
import os
|
|
import argparse
|
|
from sqlalchemy import create_engine, text
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \
|
|
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo"
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("csv_path")
|
|
parser.add_argument("--replace", action="store_true",
|
|
help="Vider server_ips avant l'import")
|
|
args = parser.parse_args()
|
|
|
|
engine = create_engine(DATABASE_URL)
|
|
print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}")
|
|
print(f"[INFO] CSV: {args.csv_path}")
|
|
|
|
with open(args.csv_path, "r", encoding="utf-8-sig", newline="") as f:
|
|
sample = f.read(4096); f.seek(0)
|
|
delim = ";" if sample.count(";") > sample.count(",") else ","
|
|
reader = csv.reader(f, delimiter=delim)
|
|
header = next(reader)
|
|
rows = list(reader)
|
|
print(f"[INFO] {len(rows)} lignes (delim={delim!r})")
|
|
print(f"[INFO] Colonnes: {header[:12]}...")
|
|
|
|
def find_idx(candidates):
|
|
"""Retourne l'indice de la PREMIERE colonne matchant."""
|
|
for i, h in enumerate(header):
|
|
if any(cand.lower() == h.lower() for cand in candidates):
|
|
return i
|
|
return -1
|
|
|
|
idx_host = find_idx(["Machine virtuelle->Nom", "Machine virtuelle",
|
|
"Serveur->Nom", "Serveur",
|
|
"Système->Nom", "Nom"])
|
|
idx_ip = find_idx(["Adresse IP", "IP"])
|
|
idx_gw = find_idx(["Passerelle"])
|
|
idx_mask = find_idx(["Masque de sous réseau"])
|
|
idx_vrf = find_idx(["VRF"])
|
|
print(f"[INFO] Indices: host={idx_host} ip={idx_ip} gw={idx_gw} mask={idx_mask} vrf={idx_vrf}")
|
|
|
|
if idx_host == -1 or idx_ip == -1:
|
|
print("[ERR] Colonne hostname ou IP introuvable")
|
|
return
|
|
|
|
conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT")
|
|
|
|
if args.replace:
|
|
conn.execute(text("DELETE FROM server_ips"))
|
|
print("[OK] server_ips vidé")
|
|
|
|
linked = 0
|
|
missing = 0
|
|
skipped = 0
|
|
|
|
for row in rows:
|
|
def cell(i):
|
|
return row[i].strip() if 0 <= i < len(row) else ""
|
|
vm_name = cell(idx_host)
|
|
if vm_name:
|
|
vm_name = vm_name.split(".")[0].lower()
|
|
ip = cell(idx_ip)
|
|
gw = cell(idx_gw) or None
|
|
mask = cell(idx_mask) or None
|
|
vrf = cell(idx_vrf) or None
|
|
|
|
if not vm_name or not ip:
|
|
skipped += 1
|
|
continue
|
|
|
|
srv = conn.execute(text("SELECT id FROM servers WHERE hostname=:h"),
|
|
{"h": vm_name}).fetchone()
|
|
if not srv:
|
|
missing += 1
|
|
continue
|
|
|
|
try:
|
|
existing = conn.execute(text(
|
|
"SELECT id FROM server_ips WHERE server_id=:sid AND ip_address=CAST(:ip AS inet)"
|
|
), {"sid": srv.id, "ip": ip}).fetchone()
|
|
if existing:
|
|
skipped += 1
|
|
continue
|
|
desc_parts = []
|
|
if vrf: desc_parts.append(f"VRF={vrf}")
|
|
if gw: desc_parts.append(f"GW={gw}")
|
|
if mask: desc_parts.append(f"MASK={mask}")
|
|
desc = " ".join(desc_parts)[:200] or None
|
|
conn.execute(text("""
|
|
INSERT INTO server_ips (server_id, ip_address, ip_type, description)
|
|
VALUES (:sid, CAST(:ip AS inet), 'primary', :d)
|
|
"""), {"sid": srv.id, "ip": ip, "d": desc})
|
|
linked += 1
|
|
except Exception as e:
|
|
print(f" [ERR] {vm_name} {ip}: {str(e)[:150]}")
|
|
skipped += 1
|
|
|
|
conn.close()
|
|
print(f"[DONE] Liés: {linked} | Hostname introuvable: {missing} | Ignoré: {skipped}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|