From de41b66a34934ea29d59119bf6e036b16e17431b Mon Sep 17 00:00:00 2001 From: Admin MPCZ Date: Tue, 14 Apr 2026 20:18:06 +0200 Subject: [PATCH] Add import_domain_ltd_from_qualys: extrait domain_ltd depuis Qualys FQDN --- tools/import_domain_ltd_from_qualys.py | 70 ++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 tools/import_domain_ltd_from_qualys.py diff --git a/tools/import_domain_ltd_from_qualys.py b/tools/import_domain_ltd_from_qualys.py new file mode 100644 index 0000000..21e6512 --- /dev/null +++ b/tools/import_domain_ltd_from_qualys.py @@ -0,0 +1,70 @@ +"""Complete servers.domain_ltd depuis qualys_assets.fqdn. + +Pour chaque serveur sans domain_ltd renseigne, extrait le domaine +(tout ce qui suit le 1er point) depuis le FQDN Qualys. + +Ex: fqdn='vposapapp1.sanef.groupe' -> domain_ltd='sanef.groupe' + +Usage: + python tools/import_domain_ltd_from_qualys.py [--dry-run] [--overwrite] +""" +import os +import argparse +from sqlalchemy import create_engine, text + +DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ + or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--overwrite", action="store_true", + help="Remplace domain_ltd existant (sinon ne remplit que les vides)") + args = parser.parse_args() + + engine = create_engine(DATABASE_URL) + print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}") + conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") + + where_clause = "" if args.overwrite else "AND (s.domain_ltd IS NULL OR s.domain_ltd = '')" + + rows = conn.execute(text(f""" + SELECT s.id as sid, s.hostname, s.domain_ltd, qa.fqdn + FROM servers s + JOIN qualys_assets qa ON (qa.server_id = s.id + OR LOWER(qa.hostname) = LOWER(s.hostname)) + WHERE qa.fqdn IS NOT NULL + AND qa.fqdn != '' + AND qa.fqdn LIKE '%.%' + {where_clause} + """)).fetchall() + print(f"[INFO] {len(rows)} candidats") + + updated = skipped = unchanged = 0 + for r in rows: + # Extract domain = tout apres le 1er point du FQDN + parts = r.fqdn.split(".", 1) + if len(parts) != 2 or not parts[1]: + skipped += 1 + continue + new_domain = parts[1].strip().lower()[:50] + if not new_domain: + skipped += 1 + continue + if r.domain_ltd == new_domain: + unchanged += 1 + continue + if args.dry_run: + print(f" DRY: {r.hostname:25s} {r.domain_ltd or '(vide)':20s} -> {new_domain}") + else: + conn.execute(text("UPDATE servers SET domain_ltd=:d WHERE id=:sid"), + {"d": new_domain, "sid": r.sid}) + updated += 1 + + conn.close() + print(f"\n[DONE] Maj: {updated} | Inchanges: {unchanged} | Skip: {skipped}") + + +if __name__ == "__main__": + main()