"""Enrichit servers avec les infos Qualys manquantes. Pour chaque server matche a un qualys_asset (via server_id ou hostname), comble: - fqdn si vide - os_family si vide (linux/windows) - os_version si vide (qa.os) - domain_ltd si vide (extrait du fqdn) Usage: python tools/enrich_servers_from_qualys.py [--dry-run] [--overwrite] """ import os import argparse from sqlalchemy import create_engine, text DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" def infer_os_family(os_text): if not os_text: return None low = os_text.lower() if any(k in low for k in ("linux", "red hat", "centos", "debian", "ubuntu", "oracle linux", "suse", "rocky", "alma")): return "linux" if "windows" in low: return "windows" return None def main(): parser = argparse.ArgumentParser() parser.add_argument("--dry-run", action="store_true") parser.add_argument("--overwrite", action="store_true", help="Ecrase les valeurs existantes (defaut: ne remplit que les vides)") args = parser.parse_args() engine = create_engine(DATABASE_URL) print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}") conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") rows = conn.execute(text(""" SELECT s.id as sid, s.hostname, s.fqdn as s_fqdn, s.os_family as s_osf, s.os_version as s_osv, s.domain_ltd as s_dom, qa.fqdn as q_fqdn, qa.os as q_os, qa.os_family as q_osf FROM servers s JOIN qualys_assets qa ON (qa.server_id = s.id OR LOWER(qa.hostname) = LOWER(s.hostname)) """)).fetchall() print(f"[INFO] {len(rows)} matches servers<->qualys_assets") stats = {"updated": 0, "unchanged": 0} field_counts = {"fqdn": 0, "os_family": 0, "os_version": 0, "domain_ltd": 0} for r in rows: updates = {} # FQDN new_fqdn = r.q_fqdn.strip() if r.q_fqdn else None if new_fqdn and (args.overwrite or not (r.s_fqdn or "").strip()): if new_fqdn != r.s_fqdn: updates["fqdn"] = new_fqdn[:255] # OS family osf = r.q_osf or infer_os_family(r.q_os) if osf and (args.overwrite or not (r.s_osf or "").strip()): if osf != r.s_osf: updates["os_family"] = osf # OS version (texte) if r.q_os and (args.overwrite or not (r.s_osv or "").strip()): if r.q_os != r.s_osv: updates["os_version"] = r.q_os[:200] # Domain_ltd depuis FQDN fqdn_source = new_fqdn or r.s_fqdn if fqdn_source and "." in fqdn_source: parts = fqdn_source.split(".", 1) if len(parts) == 2 and parts[1]: new_dom = parts[1].lower().strip()[:50] if new_dom and (args.overwrite or not (r.s_dom or "").strip()): if new_dom != r.s_dom: updates["domain_ltd"] = new_dom if not updates: stats["unchanged"] += 1 continue for k in updates: field_counts[k] += 1 if args.dry_run: print(f" DRY: {r.hostname:25s} {updates}") else: sets = ", ".join(f"{k}=:{k}" for k in updates) params = dict(updates); params["sid"] = r.sid conn.execute(text(f"UPDATE servers SET {sets} WHERE id=:sid"), params) stats["updated"] += 1 conn.close() print(f"\n[DONE] Maj: {stats['updated']} | Inchanges: {stats['unchanged']}") print("Par champ:") for k, n in field_counts.items(): if n: print(f" {k:15s} {n}") if __name__ == "__main__": main()