Add link_qualys_by_ip: lie qualys_assets a servers via IP quand hostname mismatch (cas node3->vdameasxt3)

This commit is contained in:
Pierre & Lumière 2026-04-15 12:22:04 +02:00
parent 0dc9b07edd
commit 36c638c8ce

View File

@ -0,0 +1,81 @@
"""Lie qualys_assets a servers via IP quand le hostname ne match pas.
Cas d'usage : Qualys enregistre l'asset sous un display name (ex 'node3') alors
que le vrai serveur dans iTop/PatchCenter s'appelle differemment (ex 'vdameasxt3').
On retombe sur le matching par IP via server_ips.
Ne touche QUE les qualys_assets dont server_id est NULL.
Usage:
python tools/link_qualys_by_ip.py [--dry-run]
"""
import os
import argparse
from sqlalchemy import create_engine, text
DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo"
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}")
conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT")
# Recupere les qualys_assets sans lien serveur, avec une IP
rows = conn.execute(text("""
SELECT id, qualys_asset_id, hostname, name, fqdn, ip_address::text as ip
FROM qualys_assets
WHERE server_id IS NULL
AND ip_address IS NOT NULL
ORDER BY hostname
""")).fetchall()
print(f"[INFO] {len(rows)} qualys_assets sans server_id avec une IP")
stats = {"linked": 0, "no_match": 0, "ambiguous": 0}
no_match = []
ambiguous = []
for r in rows:
# IP peut etre 'a.b.c.d' ou 'a.b.c.d/32', strip /XX
ip = r.ip.split("/")[0]
# Cherche server_ips matching cette IP
matches = conn.execute(text("""
SELECT s.id, s.hostname FROM servers s
JOIN server_ips si ON si.server_id = s.id
WHERE si.ip_address = CAST(:ip AS inet)
"""), {"ip": ip}).fetchall()
if len(matches) == 0:
stats["no_match"] += 1
no_match.append((r.hostname or r.name, ip))
continue
if len(matches) > 1:
stats["ambiguous"] += 1
ambiguous.append((r.hostname, ip, [m.hostname for m in matches]))
continue
sid = matches[0].id
srv_name = matches[0].hostname
if args.dry_run:
print(f" DRY: {r.hostname or r.name:25s} ({ip}) -> server '{srv_name}' (id={sid})")
else:
conn.execute(text("UPDATE qualys_assets SET server_id=:sid WHERE id=:qid"),
{"sid": sid, "qid": r.id})
print(f" OK : {r.hostname or r.name:25s} ({ip}) -> '{srv_name}'")
stats["linked"] += 1
conn.close()
print(f"\n[DONE] Lies: {stats['linked']} | Sans match IP: {stats['no_match']} | Ambigus: {stats['ambiguous']}")
if ambiguous:
print("\n[WARN] Cas ambigus (plusieurs servers ont la meme IP) :")
for h, ip, srvs in ambiguous[:10]:
print(f" {h} ({ip}) -> {srvs}")
if __name__ == "__main__":
main()