Fusionne 'Flux Libre'/'flux libre', 'Péage'/'peage'/'PeagE' en gardant la forme propre (avec accents et capitale). Update domain_environments.domain_id vers le keeper et supprime les doublons.
363 lines
14 KiB
Python
363 lines
14 KiB
Python
"""Alignement servers depuis fichier Excel Ayoub (Planning Patching 2026_ayoub.xlsx).
|
|
|
|
Lit le sheet 'Serveurs patchables 2026' et aligne via JOINTURE:
|
|
1. Auto-cree les domains/environments absents (valeurs iTop verbatim)
|
|
2. Auto-cree les paires domain_environments
|
|
3. Set servers.domain_env_id
|
|
4. Sync servers.environnement (plain-text pour filtre)
|
|
5. Set domain_environments.responsable_nom / referent_nom
|
|
|
|
Colonnes Excel lues:
|
|
- Asset Name -> servers.hostname (match)
|
|
- Domaine -> domains.name (code auto-genere)
|
|
- Environnement -> environments.name (code auto-genere)
|
|
- Responsable Domaine DTS -> domain_environments.responsable_nom
|
|
- Referent technique -> domain_environments.referent_nom
|
|
|
|
Usage:
|
|
python tools/align_from_ayoub.py <fichier.xlsx> [--sheet "Serveurs patchables 2026"] [--dry-run]
|
|
|
|
Requiert openpyxl: pip install openpyxl
|
|
"""
|
|
import os
|
|
import re
|
|
import argparse
|
|
import unicodedata
|
|
from sqlalchemy import create_engine, text
|
|
|
|
try:
|
|
import openpyxl
|
|
except ImportError:
|
|
print("[ERR] Installer openpyxl: pip install openpyxl")
|
|
raise
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \
|
|
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo"
|
|
|
|
NBSP = "\u00a0"
|
|
|
|
|
|
def clean(v):
|
|
if v is None:
|
|
return None
|
|
s = str(v).replace(NBSP, " ").strip()
|
|
return s or None
|
|
|
|
|
|
# Labels iTop officiels pour normaliser les variantes Excel
|
|
ITOP_ENVS = ["Développement", "Intégration", "Pré-Prod", "Production", "Recette", "Test", "Formation"]
|
|
|
|
|
|
def norm_env(raw):
|
|
"""Normalise 'production'/'Test 1'/'Preprod' vers les 7 labels iTop."""
|
|
if not raw:
|
|
return None
|
|
s = raw.strip()
|
|
# Enleve suffixe numerique (Test 1 -> Test, Recette 2 -> Recette)
|
|
s = re.sub(r"\s+\d+$", "", s)
|
|
# Match exact
|
|
if s in ITOP_ENVS:
|
|
return s
|
|
# Match case-insensitive + variantes
|
|
low = s.lower().replace("-", "").replace(" ", "")
|
|
aliases = {
|
|
"developpement": "Développement", "dev": "Développement",
|
|
"integration": "Intégration", "int": "Intégration",
|
|
"preprod": "Pré-Prod", "preproduction": "Pré-Prod",
|
|
"prod": "Production", "production": "Production",
|
|
"recette": "Recette", "rec": "Recette",
|
|
"test": "Test", "tests": "Test",
|
|
"formation": "Formation",
|
|
}
|
|
return aliases.get(low)
|
|
|
|
|
|
def slugify(s, maxlen=10):
|
|
"""Slug ASCII lowercase pour code (domain.code varchar(10))."""
|
|
if not s:
|
|
return None
|
|
nfkd = unicodedata.normalize("NFKD", s)
|
|
ascii_str = "".join(c for c in nfkd if not unicodedata.combining(c))
|
|
ascii_str = re.sub(r"[^a-zA-Z0-9]+", "", ascii_str).lower()
|
|
return ascii_str[:maxlen] or None
|
|
|
|
|
|
def norm_domain_key(s):
|
|
"""Cle de normalisation domaine: lowercase + sans accent + trim."""
|
|
if not s:
|
|
return ""
|
|
nfkd = unicodedata.normalize("NFKD", s.strip())
|
|
ascii_str = "".join(c for c in nfkd if not unicodedata.combining(c))
|
|
return ascii_str.lower()
|
|
|
|
|
|
def get_or_create_domain(conn, name):
|
|
# Match case/accent-insensitive pour fusionner "Péage" et "peage"
|
|
key = norm_domain_key(name)
|
|
existing = conn.execute(text("SELECT id FROM domains")).fetchall()
|
|
for r in existing:
|
|
r_name = conn.execute(text("SELECT name FROM domains WHERE id=:i"),
|
|
{"i": r.id}).fetchone().name
|
|
if norm_domain_key(r_name) == key:
|
|
return r.id
|
|
code = slugify(name, 10)
|
|
suffix = 0
|
|
base_code = code
|
|
while conn.execute(text("SELECT 1 FROM domains WHERE code=:c"),
|
|
{"c": code}).fetchone():
|
|
suffix += 1
|
|
code = (base_code[: 10 - len(str(suffix))] + str(suffix))
|
|
conn.execute(text(
|
|
"INSERT INTO domains (name, code, display_order) VALUES (:n, :c, 99)"
|
|
), {"n": name, "c": code})
|
|
return conn.execute(text("SELECT id FROM domains WHERE name=:n"),
|
|
{"n": name}).fetchone().id
|
|
|
|
|
|
def merge_domain_duplicates(conn, dry_run=False):
|
|
"""Fusionne les doublons 'Flux Libre'/'flux libre', 'Péage'/'peage' etc."""
|
|
rows = conn.execute(text("SELECT id, name FROM domains ORDER BY id")).fetchall()
|
|
groups = {}
|
|
for r in rows:
|
|
k = norm_domain_key(r.name)
|
|
groups.setdefault(k, []).append((r.id, r.name))
|
|
merged = 0
|
|
for k, items in groups.items():
|
|
if len(items) <= 1:
|
|
continue
|
|
# Garde le plus "propre" (avec accent/capitale en priorite)
|
|
items.sort(key=lambda x: (x[1] == x[1].lower(), x[0]))
|
|
keeper_id, keeper_name = items[0]
|
|
for dup_id, dup_name in items[1:]:
|
|
print(f" [MERGE] domain {dup_name!r} (id={dup_id}) -> {keeper_name!r} (id={keeper_id})")
|
|
if not dry_run:
|
|
conn.execute(text(
|
|
"UPDATE domain_environments SET domain_id=:k WHERE domain_id=:d"
|
|
), {"k": keeper_id, "d": dup_id})
|
|
# Supprime le doublon (si plus reference)
|
|
conn.execute(text("DELETE FROM domains WHERE id=:d"), {"d": dup_id})
|
|
merged += 1
|
|
return merged
|
|
|
|
|
|
def get_or_create_env(conn, name):
|
|
row = conn.execute(text("SELECT id FROM environments WHERE name=:n"),
|
|
{"n": name}).fetchone()
|
|
if row:
|
|
return row.id
|
|
code = slugify(name, 10)
|
|
suffix = 0
|
|
base_code = code
|
|
while conn.execute(text("SELECT 1 FROM environments WHERE code=:c"),
|
|
{"c": code}).fetchone():
|
|
suffix += 1
|
|
code = (base_code[: 10 - len(str(suffix))] + str(suffix))
|
|
conn.execute(text(
|
|
"INSERT INTO environments (name, code, display_order) VALUES (:n, :c, 99)"
|
|
), {"n": name, "c": code})
|
|
return conn.execute(text("SELECT id FROM environments WHERE name=:n"),
|
|
{"n": name}).fetchone().id
|
|
|
|
|
|
def get_or_create_dom_env(conn, domain_id, env_id):
|
|
row = conn.execute(text(
|
|
"SELECT id FROM domain_environments WHERE domain_id=:d AND environment_id=:e"
|
|
), {"d": domain_id, "e": env_id}).fetchone()
|
|
if row:
|
|
return row.id
|
|
conn.execute(text(
|
|
"INSERT INTO domain_environments (domain_id, environment_id) VALUES (:d, :e)"
|
|
), {"d": domain_id, "e": env_id})
|
|
return conn.execute(text(
|
|
"SELECT id FROM domain_environments WHERE domain_id=:d AND environment_id=:e"
|
|
), {"d": domain_id, "e": env_id}).fetchone().id
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("xlsx_path")
|
|
parser.add_argument("--sheet", default="Serveurs patchables 2026")
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
engine = create_engine(DATABASE_URL)
|
|
print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}")
|
|
print(f"[INFO] Fichier: {args.xlsx_path}")
|
|
|
|
conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT")
|
|
|
|
# 0. Fusion prealable des doublons de domaines (Flux Libre / flux libre, Peage / Péage)
|
|
print("\n[INFO] Fusion doublons domains (case/accent-insensitive)...")
|
|
merged = merge_domain_duplicates(conn, dry_run=args.dry_run)
|
|
if merged:
|
|
print(f"[INFO] {merged} doublons {'(DRY) ' if args.dry_run else ''}fusionnes")
|
|
else:
|
|
print("[INFO] Pas de doublon detecte")
|
|
|
|
wb = openpyxl.load_workbook(args.xlsx_path, data_only=True)
|
|
if args.sheet not in wb.sheetnames:
|
|
print(f"[ERR] Sheet '{args.sheet}' introuvable. Sheets: {wb.sheetnames}")
|
|
return
|
|
ws = wb[args.sheet]
|
|
|
|
header = [clean(c.value) for c in ws[1]]
|
|
|
|
def col_idx(candidates):
|
|
for i, h in enumerate(header):
|
|
if h and any(c.lower() in h.lower() for c in candidates):
|
|
return i
|
|
return -1
|
|
|
|
idx_host = col_idx(["Asset Name", "Hostname"])
|
|
idx_env = col_idx(["Environnement"])
|
|
idx_dom = col_idx(["Domaine"])
|
|
idx_resp = col_idx(["Responsable Domaine"])
|
|
idx_ref = col_idx(["Référent technique", "Referent technique"])
|
|
|
|
print(f"[INFO] Indices: host={idx_host} env={idx_env} dom={idx_dom} "
|
|
f"resp={idx_resp} ref={idx_ref}")
|
|
|
|
if idx_host == -1:
|
|
print("[ERR] Colonne Asset Name/Hostname introuvable")
|
|
return
|
|
|
|
stats = {"updated": 0, "dom_created": 0, "env_created": 0, "de_created": 0,
|
|
"not_found": 0, "skipped": 0, "de_resp_updated": 0}
|
|
seen_dom = {}
|
|
seen_env = {}
|
|
changes = []
|
|
would_create_dom = set()
|
|
would_create_env = set()
|
|
would_create_de = set()
|
|
|
|
for row in ws.iter_rows(min_row=2, values_only=True):
|
|
hostname = clean(row[idx_host]) if idx_host < len(row) else None
|
|
if not hostname or not any(c.isalpha() for c in hostname):
|
|
stats["skipped"] += 1
|
|
continue
|
|
hostname = hostname.split(".")[0].lower()
|
|
|
|
dom_name = clean(row[idx_dom]) if idx_dom >= 0 and idx_dom < len(row) else None
|
|
env_raw = clean(row[idx_env]) if idx_env >= 0 and idx_env < len(row) else None
|
|
env_name = norm_env(env_raw) # Normalisation vers 7 labels iTop
|
|
resp = clean(row[idx_resp]) if idx_resp >= 0 and idx_resp < len(row) else None
|
|
ref = clean(row[idx_ref]) if idx_ref >= 0 and idx_ref < len(row) else None
|
|
if resp:
|
|
resp = re.sub(r"\s+", " ", resp)[:100]
|
|
if ref:
|
|
ref = re.sub(r"\s+", " ", ref)[:100]
|
|
|
|
srv = conn.execute(text("SELECT id, domain_env_id, environnement "
|
|
"FROM servers WHERE hostname=:h"),
|
|
{"h": hostname}).fetchone()
|
|
if not srv:
|
|
stats["not_found"] += 1
|
|
continue
|
|
|
|
# Auto-create domain/env/de
|
|
de_id = srv.domain_env_id
|
|
if dom_name and env_name:
|
|
if dom_name not in seen_dom:
|
|
# Match case/accent-insensitive
|
|
key = norm_domain_key(dom_name)
|
|
existing_id = None
|
|
for r in conn.execute(text("SELECT id, name FROM domains")).fetchall():
|
|
if norm_domain_key(r.name) == key:
|
|
existing_id = r.id
|
|
break
|
|
if existing_id:
|
|
seen_dom[dom_name] = existing_id
|
|
elif args.dry_run:
|
|
would_create_dom.add(dom_name)
|
|
seen_dom[dom_name] = -1
|
|
else:
|
|
seen_dom[dom_name] = get_or_create_domain(conn, dom_name)
|
|
stats["dom_created"] += 1
|
|
|
|
if env_name not in seen_env:
|
|
existing = conn.execute(text("SELECT id FROM environments WHERE name=:n"),
|
|
{"n": env_name}).fetchone()
|
|
if existing:
|
|
seen_env[env_name] = existing.id
|
|
elif args.dry_run:
|
|
would_create_env.add(env_name)
|
|
seen_env[env_name] = -1
|
|
else:
|
|
seen_env[env_name] = get_or_create_env(conn, env_name)
|
|
stats["env_created"] += 1
|
|
|
|
did = seen_dom[dom_name]
|
|
eid = seen_env[env_name]
|
|
if did > 0 and eid > 0:
|
|
existing = conn.execute(text(
|
|
"SELECT id FROM domain_environments WHERE domain_id=:d AND environment_id=:e"
|
|
), {"d": did, "e": eid}).fetchone()
|
|
if existing:
|
|
de_id = existing.id
|
|
elif args.dry_run:
|
|
would_create_de.add((dom_name, env_name))
|
|
else:
|
|
de_id = get_or_create_dom_env(conn, did, eid)
|
|
stats["de_created"] += 1
|
|
if not args.dry_run and de_id:
|
|
# Sync responsable/referent sur domain_environments (max 1 valeur — on garde la derniere vue)
|
|
if resp or ref:
|
|
up = {}
|
|
if resp:
|
|
up["resp"] = resp
|
|
if ref:
|
|
up["ref"] = ref
|
|
sets = []
|
|
if "resp" in up:
|
|
sets.append("responsable_nom=:resp")
|
|
if "ref" in up:
|
|
sets.append("referent_nom=:ref")
|
|
if sets:
|
|
up["id"] = de_id
|
|
conn.execute(text(
|
|
f"UPDATE domain_environments SET {', '.join(sets)} "
|
|
f"WHERE id=:id AND (responsable_nom IS NULL OR referent_nom IS NULL)"
|
|
), up)
|
|
stats["de_resp_updated"] += 1
|
|
|
|
# Update serveur: domain_env_id + environnement plain-text
|
|
updates = {}
|
|
if de_id and srv.domain_env_id != de_id:
|
|
updates["domain_env_id"] = de_id
|
|
if env_name and srv.environnement != env_name:
|
|
updates["environnement"] = env_name[:50]
|
|
|
|
if updates:
|
|
if args.dry_run:
|
|
changes.append((hostname, updates))
|
|
else:
|
|
sets = ", ".join(f"{k}=:{k}" for k in updates)
|
|
params = dict(updates); params["sid"] = srv.id
|
|
conn.execute(text(f"UPDATE servers SET {sets} WHERE id=:sid"), params)
|
|
stats["updated"] += 1
|
|
|
|
if args.dry_run:
|
|
if would_create_dom:
|
|
print(f"\n[DRY-RUN] Domaines a creer ({len(would_create_dom)}) : {sorted(would_create_dom)}")
|
|
if would_create_env:
|
|
print(f"[DRY-RUN] Environnements a creer ({len(would_create_env)}) : {sorted(would_create_env)}")
|
|
if would_create_de:
|
|
print(f"[DRY-RUN] Paires (domaine,env) a creer : {len(would_create_de)}")
|
|
for d, e in sorted(would_create_de)[:20]:
|
|
print(f" ({d}, {e})")
|
|
if args.dry_run and changes:
|
|
print(f"\n[DRY-RUN] {len(changes)} serveurs a mettre a jour (premiers 20):")
|
|
for h, u in changes[:20]:
|
|
print(f" {h}: {u}")
|
|
|
|
print(f"\n[DONE] servers maj: {stats['updated']} | domains crees: {stats['dom_created']} "
|
|
f"| envs crees: {stats['env_created']} | (dom,env) crees: {stats['de_created']} "
|
|
f"| resp/ref syncs: {stats['de_resp_updated']} | hors base: {stats['not_found']} "
|
|
f"| skip: {stats['skipped']}")
|
|
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|