From 36c638c8ce5a53296aac54ab26686555c766f281 Mon Sep 17 00:00:00 2001 From: Admin MPCZ Date: Wed, 15 Apr 2026 12:22:04 +0200 Subject: [PATCH] Add link_qualys_by_ip: lie qualys_assets a servers via IP quand hostname mismatch (cas node3->vdameasxt3) --- tools/link_qualys_by_ip.py | 81 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 tools/link_qualys_by_ip.py diff --git a/tools/link_qualys_by_ip.py b/tools/link_qualys_by_ip.py new file mode 100644 index 0000000..aba8f5d --- /dev/null +++ b/tools/link_qualys_by_ip.py @@ -0,0 +1,81 @@ +"""Lie qualys_assets a servers via IP quand le hostname ne match pas. + +Cas d'usage : Qualys enregistre l'asset sous un display name (ex 'node3') alors +que le vrai serveur dans iTop/PatchCenter s'appelle differemment (ex 'vdameasxt3'). +On retombe sur le matching par IP via server_ips. + +Ne touche QUE les qualys_assets dont server_id est NULL. + +Usage: + python tools/link_qualys_by_ip.py [--dry-run] +""" +import os +import argparse +from sqlalchemy import create_engine, text + +DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ + or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + engine = create_engine(DATABASE_URL) + print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}") + conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") + + # Recupere les qualys_assets sans lien serveur, avec une IP + rows = conn.execute(text(""" + SELECT id, qualys_asset_id, hostname, name, fqdn, ip_address::text as ip + FROM qualys_assets + WHERE server_id IS NULL + AND ip_address IS NOT NULL + ORDER BY hostname + """)).fetchall() + print(f"[INFO] {len(rows)} qualys_assets sans server_id avec une IP") + + stats = {"linked": 0, "no_match": 0, "ambiguous": 0} + no_match = [] + ambiguous = [] + + for r in rows: + # IP peut etre 'a.b.c.d' ou 'a.b.c.d/32', strip /XX + ip = r.ip.split("/")[0] + # Cherche server_ips matching cette IP + matches = conn.execute(text(""" + SELECT s.id, s.hostname FROM servers s + JOIN server_ips si ON si.server_id = s.id + WHERE si.ip_address = CAST(:ip AS inet) + """), {"ip": ip}).fetchall() + + if len(matches) == 0: + stats["no_match"] += 1 + no_match.append((r.hostname or r.name, ip)) + continue + if len(matches) > 1: + stats["ambiguous"] += 1 + ambiguous.append((r.hostname, ip, [m.hostname for m in matches])) + continue + + sid = matches[0].id + srv_name = matches[0].hostname + if args.dry_run: + print(f" DRY: {r.hostname or r.name:25s} ({ip}) -> server '{srv_name}' (id={sid})") + else: + conn.execute(text("UPDATE qualys_assets SET server_id=:sid WHERE id=:qid"), + {"sid": sid, "qid": r.id}) + print(f" OK : {r.hostname or r.name:25s} ({ip}) -> '{srv_name}'") + stats["linked"] += 1 + + conn.close() + print(f"\n[DONE] Lies: {stats['linked']} | Sans match IP: {stats['no_match']} | Ambigus: {stats['ambiguous']}") + if ambiguous: + print("\n[WARN] Cas ambigus (plusieurs servers ont la meme IP) :") + for h, ip, srvs in ambiguous[:10]: + print(f" {h} ({ip}) -> {srvs}") + + +if __name__ == "__main__": + main()