"""Lie servers -> applications en croisant les deux fichiers Excel + contacts. Pour chaque hostname vu dans les fichiers : - Source 1 (poids 3) : Ayoub/Serveurs patchables 2026 col 'Logiciel->Nom complet' - Source 2 (poids 2) : Plan/Histo-2025 col 'Logiciel->Nom complet' - Source 3 (poids 1) : Plan/Sxx col 'Logiciel->Nom complet' Matche le nom agrege (score max) avec applications.nom_court ou nom_complet (case/accent-insensitive). Set servers.application_id. Usage: python tools/link_servers_applications.py --ayoub --patching [--dry-run] [--overwrite] """ import os import re import argparse import unicodedata from collections import defaultdict from sqlalchemy import create_engine, text try: import openpyxl except ImportError: print("[ERR] pip install openpyxl") raise DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" def norm(s): if not s: return "" nfkd = unicodedata.normalize("NFKD", s.strip().lower()) a = "".join(c for c in nfkd if not unicodedata.combining(c)) return " ".join(a.replace("\xa0", " ").split()) def clean(v): if v is None: return None s = str(v).replace("\xa0", " ").strip() return s or None def resolve_header(ws, candidates): header = [clean(c.value) or "" for c in ws[1]] for i, h in enumerate(header): for cand in candidates: if cand.lower() in h.lower(): return i return -1 def collect_from_sheet(ws, col_host, col_app, weight, scores, label): added = 0 for row in ws.iter_rows(min_row=2, values_only=True): h = clean(row[col_host]) if col_host < len(row) else None if not h: continue h = h.split(".")[0].lower() if not any(c.isalpha() for c in h): continue app = clean(row[col_app]) if col_app >= 0 and col_app < len(row) else None if not app: continue scores[h][app] += weight added += 1 print(f" [{label}] +{added} observations (poids {weight})") def main(): parser = argparse.ArgumentParser() parser.add_argument("--ayoub", default=None) parser.add_argument("--patching", default=None) parser.add_argument("--dry-run", action="store_true") parser.add_argument("--overwrite", action="store_true") args = parser.parse_args() scores = defaultdict(lambda: defaultdict(int)) if args.ayoub and os.path.exists(args.ayoub): print(f"[INFO] {args.ayoub}") wb = openpyxl.load_workbook(args.ayoub, data_only=True) if "Serveurs patchables 2026" in wb.sheetnames: ws = wb["Serveurs patchables 2026"] col_h = resolve_header(ws, ["Asset Name", "Hostname"]) col_a = resolve_header(ws, ["Logiciel->Nom complet", "Logiciel"]) if col_h >= 0 and col_a >= 0: collect_from_sheet(ws, col_h, col_a, 3, scores, "ayoub") if args.patching and os.path.exists(args.patching): print(f"[INFO] {args.patching}") wb = openpyxl.load_workbook(args.patching, data_only=True) for sheet in ("Histo-2025", "Histo_2025"): if sheet in wb.sheetnames: ws = wb[sheet] col_h = resolve_header(ws, ["Asset Name", "Hostname"]) col_a = resolve_header(ws, ["Logiciel->Nom complet", "Logiciel"]) if col_h >= 0 and col_a >= 0: collect_from_sheet(ws, col_h, col_a, 2, scores, sheet) break for sn in wb.sheetnames: if re.match(r"S\d{1,2}$", sn, re.IGNORECASE): ws = wb[sn] col_h = resolve_header(ws, ["Asset Name", "Hostname"]) col_a = resolve_header(ws, ["Logiciel->Nom complet", "Logiciel"]) if col_h >= 0 and col_a >= 0: collect_from_sheet(ws, col_h, col_a, 1, scores, sn) engine = create_engine(DATABASE_URL) conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") # Index applications : norm(nom_court) -> id, norm(nom_complet) -> id app_by_short = {} app_by_long = {} for r in conn.execute(text("SELECT id, nom_court, nom_complet FROM applications")).fetchall(): if r.nom_court: k = norm(r.nom_court) if k and k not in app_by_short: app_by_short[k] = r.id if r.nom_complet: k = norm(r.nom_complet) if k and k not in app_by_long: app_by_long[k] = r.id hosts = {r.hostname.lower(): (r.id, r.application_id) for r in conn.execute(text("SELECT id, hostname, application_id FROM servers")).fetchall()} stats = {"updated": 0, "unchanged": 0, "no_app_match": 0, "no_server": 0} unmatched_apps = defaultdict(int) for host, apps in scores.items(): entry = hosts.get(host) if not entry: stats["no_server"] += 1 continue sid, curr_app_id = entry best_name, best_score = max(apps.items(), key=lambda x: x[1]) k = norm(best_name) app_id = app_by_long.get(k) or app_by_short.get(k) if not app_id: # essaie 1er mot + match partiel first = k.split()[0] if k else "" for n, aid in app_by_long.items(): if first and first in n: app_id = aid break if not app_id: unmatched_apps[best_name] += 1 stats["no_app_match"] += 1 continue if curr_app_id == app_id and not args.overwrite: stats["unchanged"] += 1 continue if curr_app_id and not args.overwrite: stats["unchanged"] += 1 continue if args.dry_run: print(f" DRY: {host:25s} -> app_id={app_id} ({best_name[:50]}, score={best_score})") else: conn.execute(text( "UPDATE servers SET application_id=:a, application_name=:n WHERE id=:sid" ), {"a": app_id, "n": best_name[:100], "sid": sid}) stats["updated"] += 1 conn.close() print(f"\n[DONE] Liens maj: {stats['updated']} | Inchanges: {stats['unchanged']} " f"| Sans match app: {stats['no_app_match']} | Hors base: {stats['no_server']}") if unmatched_apps: print(f"\n[WARN] Apps Excel sans match catalogue ({len(unmatched_apps)}, top 10):") for n, c in sorted(unmatched_apps.items(), key=lambda x: -x[1])[:10]: print(f" {c:4d} {n[:80]}") if __name__ == "__main__": main()