- Migration deploy/migrations/2026-04-16_ioda_qualys_missing.sql:
* ALTER applications: 13 colonnes ioda_* (libelle, code_pos, type, statut,
perimetre, dept_domaine, resp_metier, resp_dsi, nb_components, etc.)
* Index unique sur ioda_libelle (cle d'upsert idempotente)
* Nouvelle table qualys_missing_servers avec server_id FK,
reason_category enum (appliance/ot_scada/virtualisation/oubli/...),
status (a_traiter/a_enroler/exempt/enrole/decom), priority 1-5,
trigger updated_at
- tools/import_applications_ioda.py: lit deploy/ServeursAssoci*IODA*.xlsx
sheet "Services Metiers", upsert sur ioda_libelle
- tools/import_qualys_missing.py: lit deploy/comparaison*.xlsx sheet
RECAP, filtre col J (COMP1) sans 'Qualys', categorise auto via
heuristique nom (BAC_/BEU_/... = ot_scada, esx*/vp*esx = virtu,
presence 3 sources sans Qualys = oubli urgent), lien auto vers
servers.id si match hostname/fqdn
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
205 lines
7.8 KiB
Python
205 lines
7.8 KiB
Python
"""Import des serveurs absents de Qualys depuis comparaisonv2.xlsx.
|
|
|
|
Lit la sheet 'RECAP' (col J = COMP1) du fichier deploy/comparaisonv2.xlsx,
|
|
filtre les lignes ou 'Qualys' n'est PAS dans la chaine, et UPSERT dans
|
|
la table `qualys_missing_servers`.
|
|
|
|
Categorisation auto via heuristique sur le nom (prefixes connus):
|
|
- virtualisation : ESXi/hyperviseurs (commencent souvent par 'esx', 'vp*esx')
|
|
- ot_scada : OT bord de route (BAC_, BEU_, BOA_, BOP_, BUC_, CAG_, *_SRV_*)
|
|
- oubli : present partout SAUF Qualys (Cyberark + S1 + ITOP) → priorite 1
|
|
- inconnu : autres cas, a investiguer
|
|
|
|
Usage:
|
|
python tools/import_qualys_missing.py [chemin_fichier.xlsx]
|
|
"""
|
|
import os
|
|
import sys
|
|
import glob
|
|
import re
|
|
from pathlib import Path
|
|
from collections import Counter
|
|
|
|
import openpyxl
|
|
from sqlalchemy import create_engine, text
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
|
|
or os.getenv("DATABASE_URL")
|
|
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
|
|
|
|
|
|
def find_comparison_file():
|
|
pattern = str(ROOT / "deploy" / "comparaison*.xlsx")
|
|
files = sorted(glob.glob(pattern))
|
|
return files[-1] if files else None
|
|
|
|
|
|
# --- Heuristiques de categorisation --------------------------------------
|
|
RE_OT_SCADA = re.compile(r"^(BAC|BEU|BOA|BOP|BUC|CAG|HEU|FLA|FLB|FLC|FLD|FLE|FLF|FLG|FLH|GAR|MEU|PAU|REI)_L\d+_S\d+", re.I)
|
|
RE_VIRTU = re.compile(r"^(esx|vp.*esx|hyp|hv\d|esxi)", re.I)
|
|
|
|
|
|
def categorize(hostname, sources_present):
|
|
"""Retourne (reason_category, status, priority, reason_detail)."""
|
|
h = hostname.lower()
|
|
|
|
# 1. OT/SCADA bord de route
|
|
if RE_OT_SCADA.match(hostname):
|
|
return ("ot_scada", "exempt", 4,
|
|
"Equipement bord de route / OT (pas d'agent Qualys possible)")
|
|
|
|
# 2. Virtualisation
|
|
if RE_VIRTU.match(h):
|
|
return ("virtualisation", "exempt", 5,
|
|
"Hyperviseur ESXi (scan via Qualys connector vCenter, pas d'agent)")
|
|
|
|
# 3. Present partout SAUF Qualys → oubli pur (urgent)
|
|
if sources_present and all(s in sources_present for s in ("Cyberark", "S1", "ITOP")):
|
|
return ("oubli", "a_enroler", 1,
|
|
"Present sur CyberArk + Sentinel + iTop, manque Qualys (oubli enrolement)")
|
|
|
|
# 4. Sentinel + iTop (sans CA): probable pas connu compte d'admin standard
|
|
if sources_present and "S1" in sources_present and "ITOP" in sources_present:
|
|
return ("oubli", "a_enroler", 2,
|
|
"Sentinel + iTop OK, manque CyberArk + Qualys")
|
|
|
|
# 5. iTop seulement: serveur orphelin a investiguer
|
|
if sources_present == "ITOP seulement":
|
|
return ("inconnu", "a_traiter", 3,
|
|
"Reference uniquement dans iTop. Serveur eteint? Decom? A verifier")
|
|
|
|
# 6. S1 seulement: agent S1 detecte mais pas de ref iTop
|
|
if sources_present == "S1 seulement":
|
|
return ("inconnu", "a_traiter", 2,
|
|
"Detecte par Sentinel mais pas iTop. Shadow IT? Asset non reference")
|
|
|
|
return ("inconnu", "a_traiter", 3, f"Sources presentes: {sources_present or 'aucune'}")
|
|
|
|
|
|
def parse_recap(xlsx_path):
|
|
"""Renvoie liste de dicts pour les lignes hors Qualys."""
|
|
wb = openpyxl.load_workbook(xlsx_path, data_only=True, read_only=True)
|
|
if "RECAP" not in wb.sheetnames:
|
|
raise SystemExit(f"[ERR] Sheet 'RECAP' introuvable. Sheets: {wb.sheetnames}")
|
|
ws = wb["RECAP"]
|
|
|
|
missing = []
|
|
for i, row in enumerate(ws.iter_rows(min_row=2, values_only=True)):
|
|
if not row or not row[4]: # col E = Concatenation
|
|
continue
|
|
hostname = str(row[4]).strip()
|
|
env = (row[2] or "").strip() or None
|
|
comp = row[9] # col J = COMP1
|
|
|
|
if not comp or "Qualys" in comp:
|
|
continue # est dans Qualys → on garde pas
|
|
|
|
missing.append({
|
|
"hostname": hostname,
|
|
"environnement": env,
|
|
"sources_present": comp,
|
|
"in_cyberark": "Cyberark" in comp,
|
|
"in_sentinel": "S1" in comp,
|
|
"in_itop": "ITOP" in comp,
|
|
})
|
|
return missing
|
|
|
|
|
|
SQL_UPSERT = text("""
|
|
INSERT INTO qualys_missing_servers (
|
|
hostname, environnement, sources_present,
|
|
in_cyberark, in_sentinel, in_itop,
|
|
server_id, reason_category, reason_detail, status, priority,
|
|
source_file, last_seen_at, created_at, updated_at
|
|
) VALUES (
|
|
:hostname, :environnement, :sources_present,
|
|
:in_cyberark, :in_sentinel, :in_itop,
|
|
:server_id, :reason_category, :reason_detail, :status, :priority,
|
|
:source_file, now(), now(), now()
|
|
)
|
|
ON CONFLICT (hostname_norm) DO UPDATE SET
|
|
environnement = EXCLUDED.environnement,
|
|
sources_present = EXCLUDED.sources_present,
|
|
in_cyberark = EXCLUDED.in_cyberark,
|
|
in_sentinel = EXCLUDED.in_sentinel,
|
|
in_itop = EXCLUDED.in_itop,
|
|
server_id = COALESCE(EXCLUDED.server_id, qualys_missing_servers.server_id),
|
|
-- reason/status/priority: ne pas ecraser si l'utilisateur a deja saisi
|
|
reason_category = CASE
|
|
WHEN qualys_missing_servers.reason_category IS NULL OR qualys_missing_servers.status = 'a_traiter'
|
|
THEN EXCLUDED.reason_category ELSE qualys_missing_servers.reason_category END,
|
|
reason_detail = CASE
|
|
WHEN qualys_missing_servers.reason_detail IS NULL
|
|
THEN EXCLUDED.reason_detail ELSE qualys_missing_servers.reason_detail END,
|
|
status = CASE
|
|
WHEN qualys_missing_servers.status = 'a_traiter'
|
|
THEN EXCLUDED.status ELSE qualys_missing_servers.status END,
|
|
priority = CASE
|
|
WHEN qualys_missing_servers.status = 'a_traiter'
|
|
THEN EXCLUDED.priority ELSE qualys_missing_servers.priority END,
|
|
source_file = EXCLUDED.source_file,
|
|
last_seen_at = now(),
|
|
updated_at = now()
|
|
""")
|
|
|
|
|
|
SQL_LINK_SERVER = text("""
|
|
SELECT id FROM servers
|
|
WHERE lower(hostname) = :h OR lower(fqdn) = :h
|
|
LIMIT 1
|
|
""")
|
|
|
|
|
|
def main():
|
|
xlsx = sys.argv[1] if len(sys.argv) > 1 else find_comparison_file()
|
|
if not xlsx or not os.path.exists(xlsx):
|
|
print("[ERR] Fichier comparaison introuvable. Place comparaisonv2.xlsx dans deploy/")
|
|
sys.exit(1)
|
|
|
|
print(f"[INFO] Fichier: {xlsx}")
|
|
missing = parse_recap(xlsx)
|
|
print(f"[INFO] Serveurs hors Qualys parses: {len(missing)}")
|
|
|
|
engine = create_engine(DATABASE_URL)
|
|
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
|
|
|
|
cat_count = Counter()
|
|
linked = 0
|
|
source_file = os.path.basename(xlsx)
|
|
|
|
with engine.begin() as conn:
|
|
for m in missing:
|
|
# Lien vers servers (si existe deja)
|
|
link = conn.execute(SQL_LINK_SERVER, {"h": m["hostname"].lower()}).fetchone()
|
|
m["server_id"] = link[0] if link else None
|
|
if link:
|
|
linked += 1
|
|
|
|
cat, status, prio, detail = categorize(m["hostname"], m["sources_present"])
|
|
cat_count[cat] += 1
|
|
|
|
params = {
|
|
**m,
|
|
"reason_category": cat,
|
|
"reason_detail": detail,
|
|
"status": status,
|
|
"priority": prio,
|
|
"source_file": source_file,
|
|
}
|
|
conn.execute(SQL_UPSERT, params)
|
|
|
|
print(f"[OK] Upsert termine — {len(missing)} lignes")
|
|
print(f"[INFO] Lies a un server existant (servers.id): {linked}")
|
|
print(f"[INFO] Repartition categories:")
|
|
for cat, c in cat_count.most_common():
|
|
print(f" {c:4d} {cat}")
|
|
print()
|
|
print("[INFO] Verifs:")
|
|
print(" SELECT reason_category, status, COUNT(*) FROM qualys_missing_servers GROUP BY 1,2 ORDER BY 1,2;")
|
|
print(" SELECT hostname, sources_present, reason_category FROM qualys_missing_servers WHERE status='a_enroler' AND priority=1;")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|