"""Import des serveurs absents de Qualys depuis comparaisonv2.xlsx. Lit la sheet 'RECAP' (col J = COMP1) du fichier deploy/comparaisonv2.xlsx, filtre les lignes ou 'Qualys' n'est PAS dans la chaine, et UPSERT dans la table `qualys_missing_servers`. Categorisation auto via heuristique sur le nom (prefixes connus): - virtualisation : ESXi/hyperviseurs (commencent souvent par 'esx', 'vp*esx') - ot_scada : OT bord de route (BAC_, BEU_, BOA_, BOP_, BUC_, CAG_, *_SRV_*) - oubli : present partout SAUF Qualys (Cyberark + S1 + ITOP) → priorite 1 - inconnu : autres cas, a investiguer Usage: python tools/import_qualys_missing.py [chemin_fichier.xlsx] """ import os import sys import glob import re from pathlib import Path from collections import Counter import openpyxl from sqlalchemy import create_engine, text ROOT = Path(__file__).resolve().parent.parent DATABASE_URL = (os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db") def find_comparison_file(): pattern = str(ROOT / "deploy" / "comparaison*.xlsx") files = sorted(glob.glob(pattern)) return files[-1] if files else None # --- Heuristiques de categorisation -------------------------------------- RE_OT_SCADA = re.compile(r"^(BAC|BEU|BOA|BOP|BUC|CAG|HEU|FLA|FLB|FLC|FLD|FLE|FLF|FLG|FLH|GAR|MEU|PAU|REI)_L\d+_S\d+", re.I) RE_VIRTU = re.compile(r"^(esx|vp.*esx|hyp|hv\d|esxi)", re.I) def categorize(hostname, sources_present): """Retourne (reason_category, status, priority, reason_detail).""" h = hostname.lower() # 1. OT/SCADA bord de route if RE_OT_SCADA.match(hostname): return ("ot_scada", "exempt", 4, "Equipement bord de route / OT (pas d'agent Qualys possible)") # 2. Virtualisation if RE_VIRTU.match(h): return ("virtualisation", "exempt", 5, "Hyperviseur ESXi (scan via Qualys connector vCenter, pas d'agent)") # 3. Present partout SAUF Qualys → oubli pur (urgent) if sources_present and all(s in sources_present for s in ("Cyberark", "S1", "ITOP")): return ("oubli", "a_enroler", 1, "Present sur CyberArk + Sentinel + iTop, manque Qualys (oubli enrolement)") # 4. Sentinel + iTop (sans CA): probable pas connu compte d'admin standard if sources_present and "S1" in sources_present and "ITOP" in sources_present: return ("oubli", "a_enroler", 2, "Sentinel + iTop OK, manque CyberArk + Qualys") # 5. iTop seulement: serveur orphelin a investiguer if sources_present == "ITOP seulement": return ("inconnu", "a_traiter", 3, "Reference uniquement dans iTop. Serveur eteint? Decom? A verifier") # 6. S1 seulement: agent S1 detecte mais pas de ref iTop if sources_present == "S1 seulement": return ("inconnu", "a_traiter", 2, "Detecte par Sentinel mais pas iTop. Shadow IT? Asset non reference") return ("inconnu", "a_traiter", 3, f"Sources presentes: {sources_present or 'aucune'}") def parse_recap(xlsx_path): """Renvoie liste de dicts pour les lignes hors Qualys.""" wb = openpyxl.load_workbook(xlsx_path, data_only=True, read_only=True) if "RECAP" not in wb.sheetnames: raise SystemExit(f"[ERR] Sheet 'RECAP' introuvable. Sheets: {wb.sheetnames}") ws = wb["RECAP"] missing = [] for i, row in enumerate(ws.iter_rows(min_row=2, values_only=True)): if not row or not row[4]: # col E = Concatenation continue hostname = str(row[4]).strip() env = (row[2] or "").strip() or None comp = row[9] # col J = COMP1 if not comp or "Qualys" in comp: continue # est dans Qualys → on garde pas missing.append({ "hostname": hostname, "environnement": env, "sources_present": comp, "in_cyberark": "Cyberark" in comp, "in_sentinel": "S1" in comp, "in_itop": "ITOP" in comp, }) return missing SQL_UPSERT = text(""" INSERT INTO qualys_missing_servers ( hostname, environnement, sources_present, in_cyberark, in_sentinel, in_itop, server_id, reason_category, reason_detail, status, priority, source_file, last_seen_at, created_at, updated_at ) VALUES ( :hostname, :environnement, :sources_present, :in_cyberark, :in_sentinel, :in_itop, :server_id, :reason_category, :reason_detail, :status, :priority, :source_file, now(), now(), now() ) ON CONFLICT (hostname_norm) DO UPDATE SET environnement = EXCLUDED.environnement, sources_present = EXCLUDED.sources_present, in_cyberark = EXCLUDED.in_cyberark, in_sentinel = EXCLUDED.in_sentinel, in_itop = EXCLUDED.in_itop, server_id = COALESCE(EXCLUDED.server_id, qualys_missing_servers.server_id), -- reason/status/priority: ne pas ecraser si l'utilisateur a deja saisi reason_category = CASE WHEN qualys_missing_servers.reason_category IS NULL OR qualys_missing_servers.status = 'a_traiter' THEN EXCLUDED.reason_category ELSE qualys_missing_servers.reason_category END, reason_detail = CASE WHEN qualys_missing_servers.reason_detail IS NULL THEN EXCLUDED.reason_detail ELSE qualys_missing_servers.reason_detail END, status = CASE WHEN qualys_missing_servers.status = 'a_traiter' THEN EXCLUDED.status ELSE qualys_missing_servers.status END, priority = CASE WHEN qualys_missing_servers.status = 'a_traiter' THEN EXCLUDED.priority ELSE qualys_missing_servers.priority END, source_file = EXCLUDED.source_file, last_seen_at = now(), updated_at = now() """) SQL_LINK_SERVER = text(""" SELECT id FROM servers WHERE lower(hostname) = :h OR lower(fqdn) = :h LIMIT 1 """) def main(): xlsx = sys.argv[1] if len(sys.argv) > 1 else find_comparison_file() if not xlsx or not os.path.exists(xlsx): print("[ERR] Fichier comparaison introuvable. Place comparaisonv2.xlsx dans deploy/") sys.exit(1) print(f"[INFO] Fichier: {xlsx}") missing = parse_recap(xlsx) print(f"[INFO] Serveurs hors Qualys parses: {len(missing)}") engine = create_engine(DATABASE_URL) print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}") cat_count = Counter() linked = 0 source_file = os.path.basename(xlsx) with engine.begin() as conn: for m in missing: # Lien vers servers (si existe deja) link = conn.execute(SQL_LINK_SERVER, {"h": m["hostname"].lower()}).fetchone() m["server_id"] = link[0] if link else None if link: linked += 1 cat, status, prio, detail = categorize(m["hostname"], m["sources_present"]) cat_count[cat] += 1 params = { **m, "reason_category": cat, "reason_detail": detail, "status": status, "priority": prio, "source_file": source_file, } conn.execute(SQL_UPSERT, params) print(f"[OK] Upsert termine — {len(missing)} lignes") print(f"[INFO] Lies a un server existant (servers.id): {linked}") print(f"[INFO] Repartition categories:") for cat, c in cat_count.most_common(): print(f" {c:4d} {cat}") print() print("[INFO] Verifs:") print(" SELECT reason_category, status, COUNT(*) FROM qualys_missing_servers GROUP BY 1,2 ORDER BY 1,2;") print(" SELECT hostname, sources_present, reason_category FROM qualys_missing_servers WHERE status='a_enroler' AND priority=1;") if __name__ == "__main__": main()