"""Remplit servers.responsable_nom / referent_nom par agregation probabiliste. Sources examinees (par priorite, poids decroissant) : 1. Planning Patching 2026_ayoub.xlsx - sheet 'Serveurs patchables 2026' (poids 3) - Col 'Asset Name' / 'Responsable Domaine DTS' / 'Referent technique' 2. Plan de Patching serveurs 2026.xlsx - sheet 'Histo-2025' (poids 2) - Col 'Asset Name' / 'Responsable Domaine DTS' / 'Referent technique' 3. Plan de Patching serveurs 2026.xlsx - sheets hebdo S02..S52 (poids 1 chacune) - Col 0 (Asset Name) / Col 7 (Responsable Domaine DTS) / Col 9 (Referent technique) 4. domain_environments.responsable_nom / referent_nom (fallback, poids 1) Algorithme par hostname : - Agrege toutes les occurrences (name, role, weight) - Pour chaque role (responsable / referent) : score = sum(weights) - Retient le nom avec le score max - Verifie existence dans contacts (case/accent-insensitive). Garde le nom canonique. - UPDATE servers.responsable_nom / referent_nom si champ vide (sauf --overwrite) Usage: python tools/fill_responsables_by_probability.py [--ayoub ] [--patching ] [--dry-run] [--overwrite] """ import os import re import argparse import unicodedata from collections import defaultdict from sqlalchemy import create_engine, text try: import openpyxl except ImportError: print("[ERR] pip install openpyxl") raise DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" def norm_name(s): """ASCII lowercase sans accent, espaces multiples compresses.""" if not s: return "" nfkd = unicodedata.normalize("NFKD", s.strip().lower()) ascii_str = "".join(c for c in nfkd if not unicodedata.combining(c)) return " ".join(ascii_str.replace("\xa0", " ").split()) def clean(v): if v is None: return None s = str(v).replace("\xa0", " ").strip() return s or None def collect_from_sheet(ws, col_host, col_resp, col_ref, weight, scores, source_label): """Cumule scores dans scores[hostname][role][canonical_name] += weight""" added = 0 for row in ws.iter_rows(min_row=2, values_only=True): h = clean(row[col_host]) if col_host < len(row) else None if not h: continue h = h.split(".")[0].lower() if not any(c.isalpha() for c in h): continue if col_resp >= 0 and col_resp < len(row): r = clean(row[col_resp]) if r: r = re.sub(r"\s+", " ", r) scores[h]["responsable"][r] += weight added += 1 if col_ref >= 0 and col_ref < len(row): r = clean(row[col_ref]) if r: r = re.sub(r"\s+", " ", r) scores[h]["referent"][r] += weight added += 1 print(f" [{source_label}] +{added} observations (weight {weight})") def resolve_header(ws, candidates): header = [clean(c.value) or "" for c in ws[1]] for i, h in enumerate(header): for cand in candidates: if cand.lower() in h.lower(): return i return -1 def main(): parser = argparse.ArgumentParser() parser.add_argument("--ayoub", default=None, help="Chemin Planning Patching 2026_ayoub.xlsx") parser.add_argument("--patching", default=None, help="Chemin Plan de Patching serveurs 2026.xlsx") parser.add_argument("--dry-run", action="store_true") parser.add_argument("--overwrite", action="store_true") args = parser.parse_args() # scores[hostname][role][name] = poids cumule scores = defaultdict(lambda: {"responsable": defaultdict(int), "referent": defaultdict(int)}) # Source 1 : Ayoub if args.ayoub and os.path.exists(args.ayoub): print(f"[INFO] Lecture {args.ayoub}") wb = openpyxl.load_workbook(args.ayoub, data_only=True) sheet = "Serveurs patchables 2026" if sheet in wb.sheetnames: ws = wb[sheet] col_h = resolve_header(ws, ["Asset Name", "Hostname"]) col_r = resolve_header(ws, ["Responsable Domaine"]) col_t = resolve_header(ws, ["Referent technique", "Référent technique"]) collect_from_sheet(ws, col_h, col_r, col_t, 3, scores, "ayoub/Serveurs patchables") # Source 2+3 : Plan de Patching (Histo-2025 + S**) if args.patching and os.path.exists(args.patching): print(f"[INFO] Lecture {args.patching}") wb = openpyxl.load_workbook(args.patching, data_only=True) # Histo-2025 for sheet in ("Histo-2025", "Histo_2025"): if sheet in wb.sheetnames: ws = wb[sheet] col_h = resolve_header(ws, ["Asset Name", "Hostname", "Nom"]) col_r = resolve_header(ws, ["Responsable Domaine"]) col_t = resolve_header(ws, ["Referent technique", "Référent technique", "Administrateur"]) collect_from_sheet(ws, col_h, col_r, col_t, 2, scores, sheet) break # Weekly sheets for sheet_name in wb.sheetnames: if re.match(r"S\d{1,2}$", sheet_name, re.IGNORECASE): ws = wb[sheet_name] col_h = resolve_header(ws, ["Asset Name", "Hostname"]) col_r = resolve_header(ws, ["Responsable Domaine"]) col_t = resolve_header(ws, ["Referent technique", "Référent technique"]) collect_from_sheet(ws, col_h, col_r, col_t, 1, scores, sheet_name) engine = create_engine(DATABASE_URL) conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") # Source 4 : fallback domain_environments print("[INFO] Fallback via domain_environments...") rows = conn.execute(text(""" SELECT s.hostname, de.responsable_nom, de.referent_nom FROM servers s JOIN domain_environments de ON s.domain_env_id = de.id """)).fetchall() for r in rows: h = r.hostname.lower() if r.responsable_nom: scores[h]["responsable"][r.responsable_nom] += 1 if r.referent_nom: scores[h]["referent"][r.referent_nom] += 1 # Index contacts pour validation/canonicalisation contacts = conn.execute(text("SELECT name FROM contacts")).fetchall() contact_canon = {} # norm_name -> name canonique for c in contacts: k = norm_name(c.name) if k and k not in contact_canon: contact_canon[k] = c.name def canonical(n): """Retourne le nom canonique (depuis contacts) ou le nom tel quel.""" k = norm_name(n) return contact_canon.get(k, n) # Decision finale par hostname stats = {"updated": 0, "unchanged": 0, "no_data": 0, "no_server": 0} detail_resp_source = defaultdict(int) hosts_in_db = {r.hostname.lower(): r.id for r in conn.execute(text("SELECT id, hostname, responsable_nom, referent_nom FROM servers")).fetchall()} curr_by_host = {r.hostname.lower(): (r.responsable_nom, r.referent_nom) for r in conn.execute(text("SELECT hostname, responsable_nom, referent_nom FROM servers")).fetchall()} for host, roles in scores.items(): sid = hosts_in_db.get(host) if not sid: stats["no_server"] += 1 continue curr_resp, curr_ref = curr_by_host.get(host, (None, None)) updates = {} # Responsable if roles["responsable"] and (args.overwrite or not (curr_resp or "").strip()): best, score = max(roles["responsable"].items(), key=lambda x: x[1]) # Normalise via contacts si possible new_name = canonical(best)[:200] if new_name != curr_resp: updates["responsable_nom"] = new_name detail_resp_source[f"score={score}"] += 1 # Referent if roles["referent"] and (args.overwrite or not (curr_ref or "").strip()): best, score = max(roles["referent"].items(), key=lambda x: x[1]) new_name = canonical(best)[:200] if new_name != curr_ref: updates["referent_nom"] = new_name if not updates: stats["unchanged"] += 1 continue if args.dry_run: print(f" DRY: {host:25s} {updates}") else: sets = ", ".join(f"{k}=:{k}" for k in updates) params = dict(updates); params["sid"] = sid conn.execute(text(f"UPDATE servers SET {sets} WHERE id=:sid"), params) stats["updated"] += 1 conn.close() print(f"\n[DONE] Maj: {stats['updated']} | Inchanges: {stats['unchanged']} " f"| Hors base: {stats['no_server']}") if __name__ == "__main__": main()