"""Alignement servers depuis fichier Excel Ayoub (Planning Patching 2026_ayoub.xlsx). Lit le sheet 'Serveurs patchables 2026' et aligne via JOINTURE: 1. Auto-cree les domains/environments absents (valeurs iTop verbatim) 2. Auto-cree les paires domain_environments 3. Set servers.domain_env_id 4. Sync servers.environnement (plain-text pour filtre) 5. Set domain_environments.responsable_nom / referent_nom Colonnes Excel lues: - Asset Name -> servers.hostname (match) - Domaine -> domains.name (code auto-genere) - Environnement -> environments.name (code auto-genere) - Responsable Domaine DTS -> domain_environments.responsable_nom - Referent technique -> domain_environments.referent_nom Usage: python tools/align_from_ayoub.py [--sheet "Serveurs patchables 2026"] [--dry-run] Requiert openpyxl: pip install openpyxl """ import os import re import argparse import unicodedata from sqlalchemy import create_engine, text try: import openpyxl except ImportError: print("[ERR] Installer openpyxl: pip install openpyxl") raise DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" NBSP = "\u00a0" def clean(v): if v is None: return None s = str(v).replace(NBSP, " ").strip() return s or None # Labels iTop officiels pour normaliser les variantes Excel ITOP_ENVS = ["Développement", "Intégration", "Pré-Prod", "Production", "Recette", "Test", "Formation"] def norm_env(raw): """Normalise 'production'/'Test 1'/'Preprod' vers les 7 labels iTop.""" if not raw: return None s = raw.strip() # Enleve suffixe numerique (Test 1 -> Test, Recette 2 -> Recette) s = re.sub(r"\s+\d+$", "", s) # Match exact if s in ITOP_ENVS: return s # Match case-insensitive + variantes low = s.lower().replace("-", "").replace(" ", "") aliases = { "developpement": "Développement", "dev": "Développement", "integration": "Intégration", "int": "Intégration", "preprod": "Pré-Prod", "preproduction": "Pré-Prod", "prod": "Production", "production": "Production", "recette": "Recette", "rec": "Recette", "test": "Test", "tests": "Test", "formation": "Formation", } return aliases.get(low) def slugify(s, maxlen=10): """Slug ASCII lowercase pour code (domain.code varchar(10)).""" if not s: return None nfkd = unicodedata.normalize("NFKD", s) ascii_str = "".join(c for c in nfkd if not unicodedata.combining(c)) ascii_str = re.sub(r"[^a-zA-Z0-9]+", "", ascii_str).lower() return ascii_str[:maxlen] or None def get_or_create_domain(conn, name): row = conn.execute(text("SELECT id, code FROM domains WHERE name=:n"), {"n": name}).fetchone() if row: return row.id code = slugify(name, 10) # Eviter collision de code suffix = 0 base_code = code while conn.execute(text("SELECT 1 FROM domains WHERE code=:c"), {"c": code}).fetchone(): suffix += 1 code = (base_code[: 10 - len(str(suffix))] + str(suffix)) conn.execute(text( "INSERT INTO domains (name, code, display_order) VALUES (:n, :c, 99)" ), {"n": name, "c": code}) return conn.execute(text("SELECT id FROM domains WHERE name=:n"), {"n": name}).fetchone().id def get_or_create_env(conn, name): row = conn.execute(text("SELECT id FROM environments WHERE name=:n"), {"n": name}).fetchone() if row: return row.id code = slugify(name, 10) suffix = 0 base_code = code while conn.execute(text("SELECT 1 FROM environments WHERE code=:c"), {"c": code}).fetchone(): suffix += 1 code = (base_code[: 10 - len(str(suffix))] + str(suffix)) conn.execute(text( "INSERT INTO environments (name, code, display_order) VALUES (:n, :c, 99)" ), {"n": name, "c": code}) return conn.execute(text("SELECT id FROM environments WHERE name=:n"), {"n": name}).fetchone().id def get_or_create_dom_env(conn, domain_id, env_id): row = conn.execute(text( "SELECT id FROM domain_environments WHERE domain_id=:d AND environment_id=:e" ), {"d": domain_id, "e": env_id}).fetchone() if row: return row.id conn.execute(text( "INSERT INTO domain_environments (domain_id, environment_id) VALUES (:d, :e)" ), {"d": domain_id, "e": env_id}) return conn.execute(text( "SELECT id FROM domain_environments WHERE domain_id=:d AND environment_id=:e" ), {"d": domain_id, "e": env_id}).fetchone().id def main(): parser = argparse.ArgumentParser() parser.add_argument("xlsx_path") parser.add_argument("--sheet", default="Serveurs patchables 2026") parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() engine = create_engine(DATABASE_URL) print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}") print(f"[INFO] Fichier: {args.xlsx_path}") conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") wb = openpyxl.load_workbook(args.xlsx_path, data_only=True) if args.sheet not in wb.sheetnames: print(f"[ERR] Sheet '{args.sheet}' introuvable. Sheets: {wb.sheetnames}") return ws = wb[args.sheet] header = [clean(c.value) for c in ws[1]] def col_idx(candidates): for i, h in enumerate(header): if h and any(c.lower() in h.lower() for c in candidates): return i return -1 idx_host = col_idx(["Asset Name", "Hostname"]) idx_env = col_idx(["Environnement"]) idx_dom = col_idx(["Domaine"]) idx_resp = col_idx(["Responsable Domaine"]) idx_ref = col_idx(["Référent technique", "Referent technique"]) print(f"[INFO] Indices: host={idx_host} env={idx_env} dom={idx_dom} " f"resp={idx_resp} ref={idx_ref}") if idx_host == -1: print("[ERR] Colonne Asset Name/Hostname introuvable") return stats = {"updated": 0, "dom_created": 0, "env_created": 0, "de_created": 0, "not_found": 0, "skipped": 0, "de_resp_updated": 0} seen_dom = {} seen_env = {} changes = [] for row in ws.iter_rows(min_row=2, values_only=True): hostname = clean(row[idx_host]) if idx_host < len(row) else None if not hostname or not any(c.isalpha() for c in hostname): stats["skipped"] += 1 continue hostname = hostname.split(".")[0].lower() dom_name = clean(row[idx_dom]) if idx_dom >= 0 and idx_dom < len(row) else None env_raw = clean(row[idx_env]) if idx_env >= 0 and idx_env < len(row) else None env_name = norm_env(env_raw) # Normalisation vers 7 labels iTop resp = clean(row[idx_resp]) if idx_resp >= 0 and idx_resp < len(row) else None ref = clean(row[idx_ref]) if idx_ref >= 0 and idx_ref < len(row) else None if resp: resp = re.sub(r"\s+", " ", resp)[:100] if ref: ref = re.sub(r"\s+", " ", ref)[:100] srv = conn.execute(text("SELECT id, domain_env_id, environnement " "FROM servers WHERE hostname=:h"), {"h": hostname}).fetchone() if not srv: stats["not_found"] += 1 continue # Auto-create domain/env/de de_id = srv.domain_env_id if dom_name and env_name: if dom_name not in seen_dom: before = conn.execute(text("SELECT COUNT(*) FROM domains")).scalar() if not args.dry_run: did = get_or_create_domain(conn, dom_name) else: row_ = conn.execute(text("SELECT id FROM domains WHERE name=:n"), {"n": dom_name}).fetchone() did = row_.id if row_ else -1 after = conn.execute(text("SELECT COUNT(*) FROM domains")).scalar() if after > before: stats["dom_created"] += 1 seen_dom[dom_name] = did if env_name not in seen_env: before = conn.execute(text("SELECT COUNT(*) FROM environments")).scalar() if not args.dry_run: eid = get_or_create_env(conn, env_name) else: row_ = conn.execute(text("SELECT id FROM environments WHERE name=:n"), {"n": env_name}).fetchone() eid = row_.id if row_ else -1 after = conn.execute(text("SELECT COUNT(*) FROM environments")).scalar() if after > before: stats["env_created"] += 1 seen_env[env_name] = eid did = seen_dom[dom_name] eid = seen_env[env_name] if did > 0 and eid > 0: if not args.dry_run: before = conn.execute(text("SELECT COUNT(*) FROM domain_environments")).scalar() de_id = get_or_create_dom_env(conn, did, eid) after = conn.execute(text("SELECT COUNT(*) FROM domain_environments")).scalar() if after > before: stats["de_created"] += 1 # Sync responsable/referent sur domain_environments (max 1 valeur — on garde la derniere vue) if resp or ref: up = {} if resp: up["resp"] = resp if ref: up["ref"] = ref sets = [] if "resp" in up: sets.append("responsable_nom=:resp") if "ref" in up: sets.append("referent_nom=:ref") if sets: up["id"] = de_id conn.execute(text( f"UPDATE domain_environments SET {', '.join(sets)} " f"WHERE id=:id AND (responsable_nom IS NULL OR referent_nom IS NULL)" ), up) stats["de_resp_updated"] += 1 # Update serveur: domain_env_id + environnement plain-text updates = {} if de_id and srv.domain_env_id != de_id: updates["domain_env_id"] = de_id if env_name and srv.environnement != env_name: updates["environnement"] = env_name[:50] if updates: if args.dry_run: changes.append((hostname, updates)) else: sets = ", ".join(f"{k}=:{k}" for k in updates) params = dict(updates); params["sid"] = srv.id conn.execute(text(f"UPDATE servers SET {sets} WHERE id=:sid"), params) stats["updated"] += 1 if args.dry_run and changes: print(f"\n[DRY-RUN] {len(changes)} serveurs a mettre a jour (premiers 20):") for h, u in changes[:20]: print(f" {h}: {u}") print(f"\n[DONE] servers maj: {stats['updated']} | domains crees: {stats['dom_created']} " f"| envs crees: {stats['env_created']} | (dom,env) crees: {stats['de_created']} " f"| resp/ref syncs: {stats['de_resp_updated']} | hors base: {stats['not_found']} " f"| skip: {stats['skipped']}") conn.close() if __name__ == "__main__": main()