"""Alignement servers depuis fichier Excel Ayoub (Planning Patching 2026_ayoub.xlsx). Lit le sheet 'Serveurs patchables 2026' et aligne via JOINTURE: 1. Auto-cree les domains/environments absents (valeurs iTop verbatim) 2. Auto-cree les paires domain_environments 3. Set servers.domain_env_id 4. Sync servers.environnement (plain-text pour filtre) 5. Set domain_environments.responsable_nom / referent_nom Colonnes Excel lues: - Asset Name -> servers.hostname (match) - Domaine -> domains.name (code auto-genere) - Environnement -> environments.name (code auto-genere) - Responsable Domaine DTS -> domain_environments.responsable_nom - Referent technique -> domain_environments.referent_nom Usage: python tools/align_from_ayoub.py [--sheet "Serveurs patchables 2026"] [--dry-run] Requiert openpyxl: pip install openpyxl """ import os import re import argparse import unicodedata from sqlalchemy import create_engine, text try: import openpyxl except ImportError: print("[ERR] Installer openpyxl: pip install openpyxl") raise DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" NBSP = "\u00a0" def clean(v): if v is None: return None s = str(v).replace(NBSP, " ").strip() return s or None # Labels iTop officiels pour normaliser les variantes Excel ITOP_ENVS = ["Développement", "Intégration", "Pré-Prod", "Production", "Recette", "Test", "Formation"] def norm_env(raw): """Normalise 'production'/'Test 1'/'Preprod' vers les 7 labels iTop.""" if not raw: return None s = raw.strip() # Enleve suffixe numerique (Test 1 -> Test, Recette 2 -> Recette) s = re.sub(r"\s+\d+$", "", s) # Match exact if s in ITOP_ENVS: return s # Match case-insensitive + variantes low = s.lower().replace("-", "").replace(" ", "") aliases = { "developpement": "Développement", "dev": "Développement", "integration": "Intégration", "int": "Intégration", "preprod": "Pré-Prod", "preproduction": "Pré-Prod", "prod": "Production", "production": "Production", "recette": "Recette", "rec": "Recette", "test": "Test", "tests": "Test", "formation": "Formation", } return aliases.get(low) def slugify(s, maxlen=10): """Slug ASCII lowercase pour code (domain.code varchar(10)).""" if not s: return None nfkd = unicodedata.normalize("NFKD", s) ascii_str = "".join(c for c in nfkd if not unicodedata.combining(c)) ascii_str = re.sub(r"[^a-zA-Z0-9]+", "", ascii_str).lower() return ascii_str[:maxlen] or None def norm_domain_key(s): """Cle de normalisation domaine: lowercase + sans accent + trim.""" if not s: return "" nfkd = unicodedata.normalize("NFKD", s.strip()) ascii_str = "".join(c for c in nfkd if not unicodedata.combining(c)) return ascii_str.lower() def get_or_create_domain(conn, name): # Match case/accent-insensitive pour fusionner "Péage" et "peage" key = norm_domain_key(name) existing = conn.execute(text("SELECT id FROM domains")).fetchall() for r in existing: r_name = conn.execute(text("SELECT name FROM domains WHERE id=:i"), {"i": r.id}).fetchone().name if norm_domain_key(r_name) == key: return r.id code = slugify(name, 10) suffix = 0 base_code = code while conn.execute(text("SELECT 1 FROM domains WHERE code=:c"), {"c": code}).fetchone(): suffix += 1 code = (base_code[: 10 - len(str(suffix))] + str(suffix)) conn.execute(text( "INSERT INTO domains (name, code, display_order) VALUES (:n, :c, 99)" ), {"n": name, "c": code}) return conn.execute(text("SELECT id FROM domains WHERE name=:n"), {"n": name}).fetchone().id def merge_domain_duplicates(conn, dry_run=False): """Fusionne les doublons 'Flux Libre'/'flux libre', 'Péage'/'peage' etc.""" rows = conn.execute(text("SELECT id, name FROM domains ORDER BY id")).fetchall() groups = {} for r in rows: k = norm_domain_key(r.name) groups.setdefault(k, []).append((r.id, r.name)) merged = 0 for k, items in groups.items(): if len(items) <= 1: continue # Garde le plus "propre" (avec accent/capitale en priorite) items.sort(key=lambda x: (x[1] == x[1].lower(), x[0])) keeper_id, keeper_name = items[0] for dup_id, dup_name in items[1:]: print(f" [MERGE] domain {dup_name!r} (id={dup_id}) -> {keeper_name!r} (id={keeper_id})") if not dry_run: conn.execute(text( "UPDATE domain_environments SET domain_id=:k WHERE domain_id=:d" ), {"k": keeper_id, "d": dup_id}) # Supprime le doublon (si plus reference) conn.execute(text("DELETE FROM domains WHERE id=:d"), {"d": dup_id}) merged += 1 return merged def get_or_create_env(conn, name): row = conn.execute(text("SELECT id FROM environments WHERE name=:n"), {"n": name}).fetchone() if row: return row.id code = slugify(name, 10) suffix = 0 base_code = code while conn.execute(text("SELECT 1 FROM environments WHERE code=:c"), {"c": code}).fetchone(): suffix += 1 code = (base_code[: 10 - len(str(suffix))] + str(suffix)) conn.execute(text( "INSERT INTO environments (name, code, display_order) VALUES (:n, :c, 99)" ), {"n": name, "c": code}) return conn.execute(text("SELECT id FROM environments WHERE name=:n"), {"n": name}).fetchone().id def get_or_create_dom_env(conn, domain_id, env_id): row = conn.execute(text( "SELECT id FROM domain_environments WHERE domain_id=:d AND environment_id=:e" ), {"d": domain_id, "e": env_id}).fetchone() if row: return row.id conn.execute(text( "INSERT INTO domain_environments (domain_id, environment_id) VALUES (:d, :e)" ), {"d": domain_id, "e": env_id}) return conn.execute(text( "SELECT id FROM domain_environments WHERE domain_id=:d AND environment_id=:e" ), {"d": domain_id, "e": env_id}).fetchone().id def main(): parser = argparse.ArgumentParser() parser.add_argument("xlsx_path") parser.add_argument("--sheet", default="Serveurs patchables 2026") parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() engine = create_engine(DATABASE_URL) print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}") print(f"[INFO] Fichier: {args.xlsx_path}") conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") # 0. Fusion prealable des doublons de domaines (Flux Libre / flux libre, Peage / Péage) print("\n[INFO] Fusion doublons domains (case/accent-insensitive)...") merged = merge_domain_duplicates(conn, dry_run=args.dry_run) if merged: print(f"[INFO] {merged} doublons {'(DRY) ' if args.dry_run else ''}fusionnes") else: print("[INFO] Pas de doublon detecte") wb = openpyxl.load_workbook(args.xlsx_path, data_only=True) if args.sheet not in wb.sheetnames: print(f"[ERR] Sheet '{args.sheet}' introuvable. Sheets: {wb.sheetnames}") return ws = wb[args.sheet] header = [clean(c.value) for c in ws[1]] def col_idx(candidates): for i, h in enumerate(header): if h and any(c.lower() in h.lower() for c in candidates): return i return -1 idx_host = col_idx(["Asset Name", "Hostname"]) idx_env = col_idx(["Environnement"]) idx_dom = col_idx(["Domaine"]) idx_resp = col_idx(["Responsable Domaine"]) idx_ref = col_idx(["Référent technique", "Referent technique"]) print(f"[INFO] Indices: host={idx_host} env={idx_env} dom={idx_dom} " f"resp={idx_resp} ref={idx_ref}") if idx_host == -1: print("[ERR] Colonne Asset Name/Hostname introuvable") return stats = {"updated": 0, "dom_created": 0, "env_created": 0, "de_created": 0, "not_found": 0, "skipped": 0, "de_resp_updated": 0} seen_dom = {} seen_env = {} changes = [] would_create_dom = set() would_create_env = set() would_create_de = set() for row in ws.iter_rows(min_row=2, values_only=True): hostname = clean(row[idx_host]) if idx_host < len(row) else None if not hostname or not any(c.isalpha() for c in hostname): stats["skipped"] += 1 continue hostname = hostname.split(".")[0].lower() dom_name = clean(row[idx_dom]) if idx_dom >= 0 and idx_dom < len(row) else None env_raw = clean(row[idx_env]) if idx_env >= 0 and idx_env < len(row) else None env_name = norm_env(env_raw) # Normalisation vers 7 labels iTop resp = clean(row[idx_resp]) if idx_resp >= 0 and idx_resp < len(row) else None ref = clean(row[idx_ref]) if idx_ref >= 0 and idx_ref < len(row) else None if resp: resp = re.sub(r"\s+", " ", resp)[:100] if ref: ref = re.sub(r"\s+", " ", ref)[:100] srv = conn.execute(text("SELECT id, domain_env_id, environnement " "FROM servers WHERE hostname=:h"), {"h": hostname}).fetchone() if not srv: stats["not_found"] += 1 continue # Auto-create domain/env/de de_id = srv.domain_env_id if dom_name and env_name: if dom_name not in seen_dom: # Match case/accent-insensitive key = norm_domain_key(dom_name) existing_id = None for r in conn.execute(text("SELECT id, name FROM domains")).fetchall(): if norm_domain_key(r.name) == key: existing_id = r.id break if existing_id: seen_dom[dom_name] = existing_id elif args.dry_run: would_create_dom.add(dom_name) seen_dom[dom_name] = -1 else: seen_dom[dom_name] = get_or_create_domain(conn, dom_name) stats["dom_created"] += 1 if env_name not in seen_env: existing = conn.execute(text("SELECT id FROM environments WHERE name=:n"), {"n": env_name}).fetchone() if existing: seen_env[env_name] = existing.id elif args.dry_run: would_create_env.add(env_name) seen_env[env_name] = -1 else: seen_env[env_name] = get_or_create_env(conn, env_name) stats["env_created"] += 1 did = seen_dom[dom_name] eid = seen_env[env_name] if did > 0 and eid > 0: existing = conn.execute(text( "SELECT id FROM domain_environments WHERE domain_id=:d AND environment_id=:e" ), {"d": did, "e": eid}).fetchone() if existing: de_id = existing.id elif args.dry_run: would_create_de.add((dom_name, env_name)) else: de_id = get_or_create_dom_env(conn, did, eid) stats["de_created"] += 1 if not args.dry_run and de_id: # Sync responsable/referent sur domain_environments (max 1 valeur — on garde la derniere vue) if resp or ref: up = {} if resp: up["resp"] = resp if ref: up["ref"] = ref sets = [] if "resp" in up: sets.append("responsable_nom=:resp") if "ref" in up: sets.append("referent_nom=:ref") if sets: up["id"] = de_id conn.execute(text( f"UPDATE domain_environments SET {', '.join(sets)} " f"WHERE id=:id AND (responsable_nom IS NULL OR referent_nom IS NULL)" ), up) stats["de_resp_updated"] += 1 # Update serveur: domain_env_id + environnement plain-text updates = {} if de_id and srv.domain_env_id != de_id: updates["domain_env_id"] = de_id if env_name and srv.environnement != env_name: updates["environnement"] = env_name[:50] if updates: if args.dry_run: changes.append((hostname, updates)) else: sets = ", ".join(f"{k}=:{k}" for k in updates) params = dict(updates); params["sid"] = srv.id conn.execute(text(f"UPDATE servers SET {sets} WHERE id=:sid"), params) stats["updated"] += 1 if args.dry_run: if would_create_dom: print(f"\n[DRY-RUN] Domaines a creer ({len(would_create_dom)}) : {sorted(would_create_dom)}") if would_create_env: print(f"[DRY-RUN] Environnements a creer ({len(would_create_env)}) : {sorted(would_create_env)}") if would_create_de: print(f"[DRY-RUN] Paires (domaine,env) a creer : {len(would_create_de)}") for d, e in sorted(would_create_de)[:20]: print(f" ({d}, {e})") if args.dry_run and changes: print(f"\n[DRY-RUN] {len(changes)} serveurs a mettre a jour (premiers 20):") for h, u in changes[:20]: print(f" {h}: {u}") print(f"\n[DONE] servers maj: {stats['updated']} | domains crees: {stats['dom_created']} " f"| envs crees: {stats['env_created']} | (dom,env) crees: {stats['de_created']} " f"| resp/ref syncs: {stats['de_resp_updated']} | hors base: {stats['not_found']} " f"| skip: {stats['skipped']}") conn.close() if __name__ == "__main__": main()