patchcenter/tools/import_applications_ioda.py
Admin MPCZ b55e8d4e26 Import IODA applications + table qualys_missing_servers
- Migration deploy/migrations/2026-04-16_ioda_qualys_missing.sql:
  * ALTER applications: 13 colonnes ioda_* (libelle, code_pos, type, statut,
    perimetre, dept_domaine, resp_metier, resp_dsi, nb_components, etc.)
  * Index unique sur ioda_libelle (cle d'upsert idempotente)
  * Nouvelle table qualys_missing_servers avec server_id FK,
    reason_category enum (appliance/ot_scada/virtualisation/oubli/...),
    status (a_traiter/a_enroler/exempt/enrole/decom), priority 1-5,
    trigger updated_at

- tools/import_applications_ioda.py: lit deploy/ServeursAssoci*IODA*.xlsx
  sheet "Services Metiers", upsert sur ioda_libelle

- tools/import_qualys_missing.py: lit deploy/comparaison*.xlsx sheet
  RECAP, filtre col J (COMP1) sans 'Qualys', categorise auto via
  heuristique nom (BAC_/BEU_/... = ot_scada, esx*/vp*esx = virtu,
  presence 3 sources sans Qualys = oubli urgent), lien auto vers
  servers.id si match hostname/fqdn

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 13:59:53 +02:00

143 lines
6.0 KiB
Python

"""Import applications depuis le fichier IODA Sanef.
Lit `deploy/ServeursAssociesAuxServicesIODA_*.xlsx` (sheet "Services Metiers")
et UPSERT dans la table `applications` en utilisant `ioda_libelle` comme cle.
Usage:
python tools/import_applications_ioda.py [chemin_fichier.xlsx]
Si chemin omis: cherche dans deploy/ le plus recent
ServeursAssoci*AuxServicesIODA_*.xlsx.
"""
import os
import sys
import glob
from pathlib import Path
from datetime import datetime
import openpyxl
from sqlalchemy import create_engine, text
ROOT = Path(__file__).resolve().parent.parent
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
or os.getenv("DATABASE_URL")
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
def find_ioda_file():
"""Trouve le fichier IODA le plus recent dans deploy/."""
pattern = str(ROOT / "deploy" / "ServeursAssoci*AuxServicesIODA_*.xlsx")
files = sorted(glob.glob(pattern))
if not files:
# Fallback: deploy/ sans variantes accent
files = sorted(glob.glob(str(ROOT / "deploy" / "*IODA*.xlsx")))
return files[-1] if files else None
def parse_services(xlsx_path):
"""Renvoie liste de dicts {libelle, lib_court, code_pos, ...} depuis sheet 'Services Metiers'."""
wb = openpyxl.load_workbook(xlsx_path, data_only=True)
ws_name = next((s for s in wb.sheetnames if "Services" in s and "Metier" in s), None)
if not ws_name:
raise SystemExit(f"[ERR] Sheet 'Services Metiers' introuvable. Sheets: {wb.sheetnames}")
ws = wb[ws_name]
services = []
for i, row in enumerate(ws.iter_rows(values_only=True)):
# Header sur 2 lignes (i=0 mega-header, i=1 vrais en-tetes)
if i < 2:
continue
# Cols: 0 Actions, 1 Type, 2 Statut, 3 Code Zone POS, 4 Lib court, 5 Libelle,
# 6 Alias, 7 Editeur, 8 Description, 9 Commentaire, 10 Nb composants,
# 11 Perimetre, 12 Dept/Domaine, 13 Resp Metier, 14 Resp DSI
libelle = row[5]
if not libelle or not str(libelle).strip():
continue
services.append({
"libelle": str(libelle).strip(),
"type": (row[1] or "").strip() if row[1] else None,
"statut": (row[2] or "").strip() if row[2] else None,
"code_pos": (row[3] or "").strip() if row[3] else None,
"lib_court": (row[4] or "").strip() if row[4] else None,
"alias": (row[6] or "").strip() if row[6] else None,
"editeur": (row[7] or "").strip() if row[7] else None,
"description": (row[8] or "").strip() if row[8] else None,
"commentaire": (row[9] or "").strip() if row[9] else None,
"nb_components": int(row[10]) if row[10] and str(row[10]).strip().isdigit() else None,
"perimetre": (row[11] or "").strip() if row[11] else None,
"dept_domaine": (row[12] or "").strip() if row[12] else None,
"resp_metier": (row[13] or "").strip() if row[13] else None,
"resp_dsi": (row[14] or "").strip() if row[14] else None,
})
return services
SQL_UPSERT = text("""
INSERT INTO applications (
nom_court, nom_complet, description, editeur, criticite,
ioda_libelle, ioda_lib_court, ioda_code_pos, ioda_type, ioda_statut,
ioda_alias, ioda_perimetre, ioda_dept_domaine, ioda_resp_metier,
ioda_resp_dsi, ioda_nb_components, ioda_commentaire, ioda_imported_at,
created_at, updated_at
) VALUES (
:nom_court, :nom_complet, :description, :editeur, 'standard',
:libelle, :lib_court, :code_pos, :type, :statut,
:alias, :perimetre, :dept_domaine, :resp_metier,
:resp_dsi, :nb_components, :commentaire, now(),
now(), now()
)
ON CONFLICT (ioda_libelle) WHERE ioda_libelle IS NOT NULL
DO UPDATE SET
nom_complet = EXCLUDED.nom_complet,
description = COALESCE(EXCLUDED.description, applications.description),
editeur = COALESCE(EXCLUDED.editeur, applications.editeur),
ioda_lib_court = EXCLUDED.ioda_lib_court,
ioda_code_pos = EXCLUDED.ioda_code_pos,
ioda_type = EXCLUDED.ioda_type,
ioda_statut = EXCLUDED.ioda_statut,
ioda_alias = EXCLUDED.ioda_alias,
ioda_perimetre = EXCLUDED.ioda_perimetre,
ioda_dept_domaine = EXCLUDED.ioda_dept_domaine,
ioda_resp_metier = EXCLUDED.ioda_resp_metier,
ioda_resp_dsi = EXCLUDED.ioda_resp_dsi,
ioda_nb_components = EXCLUDED.ioda_nb_components,
ioda_commentaire = EXCLUDED.ioda_commentaire,
ioda_imported_at = now(),
updated_at = now()
""")
def main():
xlsx = sys.argv[1] if len(sys.argv) > 1 else find_ioda_file()
if not xlsx or not os.path.exists(xlsx):
print(f"[ERR] Fichier IODA introuvable. Place-le dans deploy/ (ex: deploy/ServeursAssociesAuxServicesIODA_YYYYMMDD.xlsx)")
sys.exit(1)
print(f"[INFO] Fichier: {xlsx}")
services = parse_services(xlsx)
print(f"[INFO] Services parses: {len(services)}")
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
inserted = updated = 0
with engine.begin() as conn:
for svc in services:
# nom_court max 50 chars: prefere lib_court, sinon truncate libelle
nom_court = (svc["lib_court"] or svc["libelle"])[:50]
params = {
**svc,
"nom_court": nom_court,
"nom_complet": svc["libelle"][:200],
}
r = conn.execute(SQL_UPSERT, params)
# Discriminer insert vs update via xmin trick? Plus simple: compter rowcount
inserted += 1 # upsert ne distingue pas, on log la totale
print(f"[OK] Upsert termine — {inserted} services traites")
print(f"[INFO] Verif: SELECT COUNT(*) FROM applications WHERE ioda_libelle IS NOT NULL;")
if __name__ == "__main__":
main()