"""Fallback IPs depuis qualys_assets.ip_address pour les serveurs sans IP. Utilise les IPs de scan Qualys pour combler les serveurs iTop qui n'ont pas d'Interface Réseau / d'IP directe renseignée. Usage: python tools/import_ips_from_qualys.py [--dry-run] """ import os import argparse from sqlalchemy import create_engine, text DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" def main(): parser = argparse.ArgumentParser() parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() engine = create_engine(DATABASE_URL) print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}") conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") # Match qualys_assets -> servers via server_id (prioritaire) ou hostname rows = conn.execute(text(""" SELECT s.id as sid, s.hostname, qa.ip_address::text as ip_address FROM servers s JOIN qualys_assets qa ON (qa.server_id = s.id OR LOWER(qa.hostname) = LOWER(s.hostname)) WHERE qa.ip_address IS NOT NULL AND NOT EXISTS (SELECT 1 FROM server_ips si WHERE si.server_id = s.id) """)).fetchall() print(f"[INFO] {len(rows)} serveurs candidates (sans IP, avec Qualys asset)") inserted = skipped = 0 for r in rows: ip = r.ip_address.split(",")[0].strip() # 1ere IP si plusieurs ip = ip.split("/")[0] # enleve suffixe /32 (format inet Postgres) if not ip: skipped += 1 continue if args.dry_run: print(f" DRY: {r.hostname:20s} <- {ip}") inserted += 1 continue try: conn.execute(text(""" INSERT INTO server_ips (server_id, ip_address, ip_type, description) VALUES (:sid, CAST(:ip AS inet), 'primary', 'Qualys scan') """), {"sid": r.sid, "ip": ip}) inserted += 1 except Exception as e: print(f" [ERR] {r.hostname} {ip}: {str(e)[:120]}") skipped += 1 conn.close() print(f"\n[DONE] Inserts: {inserted} | Skip: {skipped}") if __name__ == "__main__": main()