From 52cb7b4cfcb5d3dc5e6721e6824746e7204dbb4b Mon Sep 17 00:00:00 2001 From: Admin MPCZ Date: Tue, 14 Apr 2026 21:54:36 +0200 Subject: [PATCH] Add enrich_servers_from_qualys: pousse fqdn/os_family/os_version/domain_ltd depuis Qualys --- tools/enrich_servers_from_qualys.py | 110 ++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 tools/enrich_servers_from_qualys.py diff --git a/tools/enrich_servers_from_qualys.py b/tools/enrich_servers_from_qualys.py new file mode 100644 index 0000000..fdb75b5 --- /dev/null +++ b/tools/enrich_servers_from_qualys.py @@ -0,0 +1,110 @@ +"""Enrichit servers avec les infos Qualys manquantes. + +Pour chaque server matche a un qualys_asset (via server_id ou hostname), comble: + - fqdn si vide + - os_family si vide (linux/windows) + - os_version si vide (qa.os) + - domain_ltd si vide (extrait du fqdn) + +Usage: + python tools/enrich_servers_from_qualys.py [--dry-run] [--overwrite] +""" +import os +import argparse +from sqlalchemy import create_engine, text + +DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ + or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" + + +def infer_os_family(os_text): + if not os_text: + return None + low = os_text.lower() + if any(k in low for k in ("linux", "red hat", "centos", "debian", "ubuntu", + "oracle linux", "suse", "rocky", "alma")): + return "linux" + if "windows" in low: + return "windows" + return None + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--overwrite", action="store_true", + help="Ecrase les valeurs existantes (defaut: ne remplit que les vides)") + args = parser.parse_args() + + engine = create_engine(DATABASE_URL) + print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}") + conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") + + rows = conn.execute(text(""" + SELECT s.id as sid, s.hostname, + s.fqdn as s_fqdn, s.os_family as s_osf, s.os_version as s_osv, + s.domain_ltd as s_dom, + qa.fqdn as q_fqdn, qa.os as q_os, qa.os_family as q_osf + FROM servers s + JOIN qualys_assets qa ON (qa.server_id = s.id + OR LOWER(qa.hostname) = LOWER(s.hostname)) + """)).fetchall() + print(f"[INFO] {len(rows)} matches servers<->qualys_assets") + + stats = {"updated": 0, "unchanged": 0} + field_counts = {"fqdn": 0, "os_family": 0, "os_version": 0, "domain_ltd": 0} + + for r in rows: + updates = {} + + # FQDN + new_fqdn = r.q_fqdn.strip() if r.q_fqdn else None + if new_fqdn and (args.overwrite or not (r.s_fqdn or "").strip()): + if new_fqdn != r.s_fqdn: + updates["fqdn"] = new_fqdn[:255] + + # OS family + osf = r.q_osf or infer_os_family(r.q_os) + if osf and (args.overwrite or not (r.s_osf or "").strip()): + if osf != r.s_osf: + updates["os_family"] = osf + + # OS version (texte) + if r.q_os and (args.overwrite or not (r.s_osv or "").strip()): + if r.q_os != r.s_osv: + updates["os_version"] = r.q_os[:200] + + # Domain_ltd depuis FQDN + fqdn_source = new_fqdn or r.s_fqdn + if fqdn_source and "." in fqdn_source: + parts = fqdn_source.split(".", 1) + if len(parts) == 2 and parts[1]: + new_dom = parts[1].lower().strip()[:50] + if new_dom and (args.overwrite or not (r.s_dom or "").strip()): + if new_dom != r.s_dom: + updates["domain_ltd"] = new_dom + + if not updates: + stats["unchanged"] += 1 + continue + for k in updates: + field_counts[k] += 1 + + if args.dry_run: + print(f" DRY: {r.hostname:25s} {updates}") + else: + sets = ", ".join(f"{k}=:{k}" for k in updates) + params = dict(updates); params["sid"] = r.sid + conn.execute(text(f"UPDATE servers SET {sets} WHERE id=:sid"), params) + stats["updated"] += 1 + + conn.close() + print(f"\n[DONE] Maj: {stats['updated']} | Inchanges: {stats['unchanged']}") + print("Par champ:") + for k, n in field_counts.items(): + if n: + print(f" {k:15s} {n}") + + +if __name__ == "__main__": + main()