From 41f5e07e72168523b17ac4a4297ffad2e80447ed Mon Sep 17 00:00:00 2001 From: Admin MPCZ Date: Tue, 14 Apr 2026 22:56:57 +0200 Subject: [PATCH] Add link_servers_applications: lie servers->applications depuis 2 Excel (ponderation 3/2/1) --- tools/link_servers_applications.py | 181 +++++++++++++++++++++++++++++ 1 file changed, 181 insertions(+) create mode 100644 tools/link_servers_applications.py diff --git a/tools/link_servers_applications.py b/tools/link_servers_applications.py new file mode 100644 index 0000000..ef5bb8b --- /dev/null +++ b/tools/link_servers_applications.py @@ -0,0 +1,181 @@ +"""Lie servers -> applications en croisant les deux fichiers Excel + contacts. + +Pour chaque hostname vu dans les fichiers : + - Source 1 (poids 3) : Ayoub/Serveurs patchables 2026 col 'Logiciel->Nom complet' + - Source 2 (poids 2) : Plan/Histo-2025 col 'Logiciel->Nom complet' + - Source 3 (poids 1) : Plan/Sxx col 'Logiciel->Nom complet' + +Matche le nom agrege (score max) avec applications.nom_court ou nom_complet +(case/accent-insensitive). Set servers.application_id. + +Usage: + python tools/link_servers_applications.py --ayoub --patching [--dry-run] [--overwrite] +""" +import os +import re +import argparse +import unicodedata +from collections import defaultdict +from sqlalchemy import create_engine, text + +try: + import openpyxl +except ImportError: + print("[ERR] pip install openpyxl") + raise + +DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \ + or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo" + + +def norm(s): + if not s: + return "" + nfkd = unicodedata.normalize("NFKD", s.strip().lower()) + a = "".join(c for c in nfkd if not unicodedata.combining(c)) + return " ".join(a.replace("\xa0", " ").split()) + + +def clean(v): + if v is None: + return None + s = str(v).replace("\xa0", " ").strip() + return s or None + + +def resolve_header(ws, candidates): + header = [clean(c.value) or "" for c in ws[1]] + for i, h in enumerate(header): + for cand in candidates: + if cand.lower() in h.lower(): + return i + return -1 + + +def collect_from_sheet(ws, col_host, col_app, weight, scores, label): + added = 0 + for row in ws.iter_rows(min_row=2, values_only=True): + h = clean(row[col_host]) if col_host < len(row) else None + if not h: + continue + h = h.split(".")[0].lower() + if not any(c.isalpha() for c in h): + continue + app = clean(row[col_app]) if col_app >= 0 and col_app < len(row) else None + if not app: + continue + scores[h][app] += weight + added += 1 + print(f" [{label}] +{added} observations (poids {weight})") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--ayoub", default=None) + parser.add_argument("--patching", default=None) + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--overwrite", action="store_true") + args = parser.parse_args() + + scores = defaultdict(lambda: defaultdict(int)) + + if args.ayoub and os.path.exists(args.ayoub): + print(f"[INFO] {args.ayoub}") + wb = openpyxl.load_workbook(args.ayoub, data_only=True) + if "Serveurs patchables 2026" in wb.sheetnames: + ws = wb["Serveurs patchables 2026"] + col_h = resolve_header(ws, ["Asset Name", "Hostname"]) + col_a = resolve_header(ws, ["Logiciel->Nom complet", "Logiciel"]) + if col_h >= 0 and col_a >= 0: + collect_from_sheet(ws, col_h, col_a, 3, scores, "ayoub") + + if args.patching and os.path.exists(args.patching): + print(f"[INFO] {args.patching}") + wb = openpyxl.load_workbook(args.patching, data_only=True) + for sheet in ("Histo-2025", "Histo_2025"): + if sheet in wb.sheetnames: + ws = wb[sheet] + col_h = resolve_header(ws, ["Asset Name", "Hostname"]) + col_a = resolve_header(ws, ["Logiciel->Nom complet", "Logiciel"]) + if col_h >= 0 and col_a >= 0: + collect_from_sheet(ws, col_h, col_a, 2, scores, sheet) + break + for sn in wb.sheetnames: + if re.match(r"S\d{1,2}$", sn, re.IGNORECASE): + ws = wb[sn] + col_h = resolve_header(ws, ["Asset Name", "Hostname"]) + col_a = resolve_header(ws, ["Logiciel->Nom complet", "Logiciel"]) + if col_h >= 0 and col_a >= 0: + collect_from_sheet(ws, col_h, col_a, 1, scores, sn) + + engine = create_engine(DATABASE_URL) + conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") + + # Index applications : norm(nom_court) -> id, norm(nom_complet) -> id + app_by_short = {} + app_by_long = {} + for r in conn.execute(text("SELECT id, nom_court, nom_complet FROM applications")).fetchall(): + if r.nom_court: + k = norm(r.nom_court) + if k and k not in app_by_short: + app_by_short[k] = r.id + if r.nom_complet: + k = norm(r.nom_complet) + if k and k not in app_by_long: + app_by_long[k] = r.id + + hosts = {r.hostname.lower(): (r.id, r.application_id) for r in + conn.execute(text("SELECT id, hostname, application_id FROM servers")).fetchall()} + + stats = {"updated": 0, "unchanged": 0, "no_app_match": 0, "no_server": 0} + unmatched_apps = defaultdict(int) + + for host, apps in scores.items(): + entry = hosts.get(host) + if not entry: + stats["no_server"] += 1 + continue + sid, curr_app_id = entry + + best_name, best_score = max(apps.items(), key=lambda x: x[1]) + k = norm(best_name) + app_id = app_by_long.get(k) or app_by_short.get(k) + if not app_id: + # essaie 1er mot + match partiel + first = k.split()[0] if k else "" + for n, aid in app_by_long.items(): + if first and first in n: + app_id = aid + break + + if not app_id: + unmatched_apps[best_name] += 1 + stats["no_app_match"] += 1 + continue + + if curr_app_id == app_id and not args.overwrite: + stats["unchanged"] += 1 + continue + if curr_app_id and not args.overwrite: + stats["unchanged"] += 1 + continue + + if args.dry_run: + print(f" DRY: {host:25s} -> app_id={app_id} ({best_name[:50]}, score={best_score})") + else: + conn.execute(text( + "UPDATE servers SET application_id=:a, application_name=:n WHERE id=:sid" + ), {"a": app_id, "n": best_name[:100], "sid": sid}) + stats["updated"] += 1 + + conn.close() + print(f"\n[DONE] Liens maj: {stats['updated']} | Inchanges: {stats['unchanged']} " + f"| Sans match app: {stats['no_app_match']} | Hors base: {stats['no_server']}") + if unmatched_apps: + print(f"\n[WARN] Apps Excel sans match catalogue ({len(unmatched_apps)}, top 10):") + for n, c in sorted(unmatched_apps.items(), key=lambda x: -x[1])[:10]: + print(f" {c:4d} {n[:80]}") + + +if __name__ == "__main__": + main()