107 lines
4.2 KiB
Python
107 lines
4.2 KiB
Python
"""Import IPs depuis la colonne IP du CSV iTop VMs/Physical -> server_ips.
|
|
|
|
Plus simple que Interface Réseau: l'IP principale est attribut direct
|
|
sur la VM dans iTop (colonne 'IP' du CSV export).
|
|
|
|
Usage:
|
|
python tools/import_ips_from_assets.py <csv1> [<csv2> ...] [--truncate] [--dry-run]
|
|
"""
|
|
import os
|
|
import csv
|
|
import argparse
|
|
import re
|
|
from sqlalchemy import create_engine, text
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \
|
|
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo"
|
|
|
|
IP_REGEX = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("csv_paths", nargs="+")
|
|
parser.add_argument("--truncate", action="store_true")
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
engine = create_engine(DATABASE_URL)
|
|
print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}")
|
|
conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT")
|
|
|
|
if args.truncate and not args.dry_run:
|
|
conn.execute(text("TRUNCATE server_ips RESTART IDENTITY CASCADE"))
|
|
print("[INFO] server_ips vidée")
|
|
|
|
stats = {"inserted": 0, "updated": 0, "no_server": 0, "skipped": 0, "bad_ip": 0}
|
|
|
|
for path in args.csv_paths:
|
|
print(f"\n[INFO] Lecture {path}")
|
|
with open(path, "r", encoding="utf-8-sig", newline="") as f:
|
|
sample = f.read(4096); f.seek(0)
|
|
delim = ";" if sample.count(";") > sample.count(",") else ","
|
|
rows = list(csv.DictReader(f, delimiter=delim))
|
|
print(f"[INFO] {len(rows)} lignes")
|
|
|
|
for r in rows:
|
|
hostname = (r.get("Nom") or r.get("Hostname") or "").strip()
|
|
if not hostname or not any(c.isalpha() for c in hostname):
|
|
stats["skipped"] += 1
|
|
continue
|
|
hostname = hostname.split(".")[0].lower()
|
|
|
|
ip = (r.get("IP") or "").strip()
|
|
if not ip:
|
|
stats["skipped"] += 1
|
|
continue
|
|
# Gere multiples IPs separees par virgule/espace
|
|
candidates = re.split(r"[,\s;]+", ip)
|
|
ips_valid = [c for c in candidates if IP_REGEX.match(c)]
|
|
if not ips_valid:
|
|
stats["bad_ip"] += 1
|
|
continue
|
|
|
|
vrf = (r.get("VRF") or "").strip()[:50] or None
|
|
|
|
srv = conn.execute(text("SELECT id FROM servers WHERE hostname=:h"),
|
|
{"h": hostname}).fetchone()
|
|
if not srv:
|
|
stats["no_server"] += 1
|
|
continue
|
|
|
|
for idx, ip_val in enumerate(ips_valid):
|
|
is_primary = (idx == 0)
|
|
if args.dry_run:
|
|
print(f" DRY: {hostname:20s} {ip_val} {'(primary)' if is_primary else ''}")
|
|
stats["inserted"] += 1
|
|
continue
|
|
try:
|
|
# Check existant
|
|
existing = conn.execute(text(
|
|
"SELECT id FROM server_ips WHERE server_id=:sid AND ip_address=CAST(:ip AS inet)"
|
|
), {"sid": srv.id, "ip": ip_val}).fetchone()
|
|
if existing:
|
|
if vrf:
|
|
conn.execute(text(
|
|
"UPDATE server_ips SET vrf=:v, is_primary=:p WHERE id=:id"
|
|
), {"v": vrf, "p": is_primary, "id": existing.id})
|
|
stats["updated"] += 1
|
|
else:
|
|
conn.execute(text("""
|
|
INSERT INTO server_ips (server_id, ip_address, is_primary, vrf)
|
|
VALUES (:sid, CAST(:ip AS inet), :p, :v)
|
|
"""), {"sid": srv.id, "ip": ip_val, "p": is_primary, "v": vrf})
|
|
stats["inserted"] += 1
|
|
except Exception as e:
|
|
print(f" [ERR] {hostname} {ip_val}: {str(e)[:120]}")
|
|
stats["skipped"] += 1
|
|
|
|
conn.close()
|
|
print(f"\n[DONE] Inserts: {stats['inserted']} | Updates: {stats['updated']} "
|
|
f"| Sans serveur: {stats['no_server']} | IP invalide: {stats['bad_ip']} "
|
|
f"| Skip: {stats['skipped']}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|