patchcenter/tools/align_dmz_from_ayoub.py

131 lines
4.6 KiB
Python

"""Tag servers.zone_id = DMZ depuis la colonne DMZ du fichier Ayoub.
Lit sheet 'Serveurs patchables 2026' colonne DMZ (booleen True/False ou oui/non).
Cree la zone 'DMZ' si absente (is_dmz=true).
Usage:
python tools/align_dmz_from_ayoub.py <xlsx> [--dry-run]
"""
import os
import argparse
from sqlalchemy import create_engine, text
try:
import openpyxl
except ImportError:
print("[ERR] pip install openpyxl")
raise
DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo"
def is_dmz_value(v):
if v is None:
return False
if isinstance(v, bool):
return v
s = str(v).strip().lower()
if s in ("true", "vrai", "oui", "yes", "1", "dmz", "x"):
return True
# SANEF: 'Exposition internet', 'Exposition Internet', 'Expo indirecte'
if "exposition" in s or "expo indirecte" in s or "exposé" in s or "expose" in s:
return True
return False
def main():
parser = argparse.ArgumentParser()
parser.add_argument("xlsx_path")
parser.add_argument("--sheet", default=None,
help="Sheet a utiliser (auto: Histo-2025 ou Serveurs patchables 2026)")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}")
conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT")
# Recupere / cree la zone DMZ
dmz = conn.execute(text(
"SELECT id FROM zones WHERE is_dmz=true OR LOWER(name)='dmz' ORDER BY id LIMIT 1"
)).fetchone()
if dmz:
dmz_id = dmz.id
print(f"[INFO] Zone DMZ existante (id={dmz_id})")
else:
if args.dry_run:
print("[DRY] Zone DMZ sera creee")
dmz_id = -1
else:
conn.execute(text(
"INSERT INTO zones (name, description, is_dmz) VALUES ('DMZ', 'Zone DMZ - patching prioritaire', true)"
))
dmz_id = conn.execute(text(
"SELECT id FROM zones WHERE is_dmz=true ORDER BY id LIMIT 1"
)).fetchone().id
print(f"[INFO] Zone DMZ creee (id={dmz_id})")
wb = openpyxl.load_workbook(args.xlsx_path, data_only=True)
# Auto-detection sheet si pas specifie
sheet_name = args.sheet
if not sheet_name:
for candidate in ("Histo-2025", "Serveurs patchables 2026", "Histo_2025"):
if candidate in wb.sheetnames:
sheet_name = candidate
break
if not sheet_name or sheet_name not in wb.sheetnames:
print(f"[ERR] Sheet introuvable. Sheets disponibles: {wb.sheetnames}")
return
ws = wb[sheet_name]
print(f"[INFO] Sheet utilisee: {sheet_name}")
header = [str(c.value).strip() if c.value else "" for c in ws[1]]
idx_host = next((i for i,h in enumerate(header) if h in ("Asset Name", "Hostname", "Nom")), -1)
idx_dmz = next((i for i,h in enumerate(header) if h == "DMZ"), -1)
if idx_host == -1:
print(f"[ERR] Colonne Asset Name/Hostname/Nom introuvable. Headers: {header[:10]}")
return
if idx_dmz == -1:
print(f"[ERR] Colonne DMZ introuvable. Headers: {header}")
return
print(f"[INFO] idx_host={idx_host} idx_dmz={idx_dmz}")
stats = {"tagged": 0, "already_dmz": 0, "not_dmz": 0, "not_found": 0}
for row in ws.iter_rows(min_row=2, values_only=True):
hostname = row[idx_host] if idx_host < len(row) else None
if not hostname:
continue
hostname = str(hostname).strip().split(".")[0].lower()
if not any(c.isalpha() for c in hostname):
continue
is_dmz = is_dmz_value(row[idx_dmz] if idx_dmz < len(row) else None)
if not is_dmz:
stats["not_dmz"] += 1
continue
srv = conn.execute(text("SELECT id, zone_id FROM servers WHERE hostname=:h"),
{"h": hostname}).fetchone()
if not srv:
stats["not_found"] += 1
continue
if srv.zone_id == dmz_id:
stats["already_dmz"] += 1
continue
if args.dry_run:
print(f" DRY: {hostname} (zone {srv.zone_id} -> DMZ)")
else:
conn.execute(text("UPDATE servers SET zone_id=:z WHERE id=:sid"),
{"z": dmz_id, "sid": srv.id})
stats["tagged"] += 1
conn.close()
print(f"\n[DONE] Tagged DMZ: {stats['tagged']} | Deja DMZ: {stats['already_dmz']} "
f"| Non-DMZ: {stats['not_dmz']} | Hors base: {stats['not_found']}")
if __name__ == "__main__":
main()