"""Tag servers.zone_id = DMZ depuis la colonne DMZ du fichier Ayoub. Lit sheet 'Serveurs patchables 2026' colonne DMZ (booleen True/False ou oui/non). Cree la zone 'DMZ' si absente (is_dmz=true). Usage: python tools/align_dmz_from_ayoub.py [--dry-run] """ import os import argparse from sqlalchemy import create_engine, text try: import openpyxl except ImportError: print("[ERR] pip install openpyxl") raise DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" def is_dmz_value(v): if v is None: return False if isinstance(v, bool): return v s = str(v).strip().lower() if s in ("true", "vrai", "oui", "yes", "1", "dmz", "x"): return True # SANEF: 'Exposition internet', 'Exposition Internet', 'Expo indirecte' if "exposition" in s or "expo indirecte" in s or "exposé" in s or "expose" in s: return True return False def main(): parser = argparse.ArgumentParser() parser.add_argument("xlsx_path") parser.add_argument("--sheet", default="Serveurs patchables 2026") parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() engine = create_engine(DATABASE_URL) print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}") conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") # Recupere / cree la zone DMZ dmz = conn.execute(text( "SELECT id FROM zones WHERE is_dmz=true OR LOWER(name)='dmz' ORDER BY id LIMIT 1" )).fetchone() if dmz: dmz_id = dmz.id print(f"[INFO] Zone DMZ existante (id={dmz_id})") else: if args.dry_run: print("[DRY] Zone DMZ sera creee") dmz_id = -1 else: conn.execute(text( "INSERT INTO zones (name, description, is_dmz) VALUES ('DMZ', 'Zone DMZ - patching prioritaire', true)" )) dmz_id = conn.execute(text( "SELECT id FROM zones WHERE is_dmz=true ORDER BY id LIMIT 1" )).fetchone().id print(f"[INFO] Zone DMZ creee (id={dmz_id})") wb = openpyxl.load_workbook(args.xlsx_path, data_only=True) if args.sheet not in wb.sheetnames: print(f"[ERR] Sheet '{args.sheet}' introuvable. Sheets: {wb.sheetnames}") return ws = wb[args.sheet] header = [str(c.value).strip() if c.value else "" for c in ws[1]] try: idx_host = header.index("Asset Name") except ValueError: try: idx_host = header.index("Hostname") except ValueError: print("[ERR] Colonne Asset Name/Hostname introuvable") return try: idx_dmz = header.index("DMZ") except ValueError: print("[ERR] Colonne DMZ introuvable") return print(f"[INFO] idx_host={idx_host} idx_dmz={idx_dmz}") stats = {"tagged": 0, "already_dmz": 0, "not_dmz": 0, "not_found": 0} for row in ws.iter_rows(min_row=2, values_only=True): hostname = row[idx_host] if idx_host < len(row) else None if not hostname: continue hostname = str(hostname).strip().split(".")[0].lower() if not any(c.isalpha() for c in hostname): continue is_dmz = is_dmz_value(row[idx_dmz] if idx_dmz < len(row) else None) if not is_dmz: stats["not_dmz"] += 1 continue srv = conn.execute(text("SELECT id, zone_id FROM servers WHERE hostname=:h"), {"h": hostname}).fetchone() if not srv: stats["not_found"] += 1 continue if srv.zone_id == dmz_id: stats["already_dmz"] += 1 continue if args.dry_run: print(f" DRY: {hostname} (zone {srv.zone_id} -> DMZ)") else: conn.execute(text("UPDATE servers SET zone_id=:z WHERE id=:sid"), {"z": dmz_id, "sid": srv.id}) stats["tagged"] += 1 conn.close() print(f"\n[DONE] Tagged DMZ: {stats['tagged']} | Deja DMZ: {stats['already_dmz']} " f"| Non-DMZ: {stats['not_dmz']} | Hors base: {stats['not_found']}") if __name__ == "__main__": main()