patchcenter/tools/fill_responsables_by_probability.py
Admin MPCZ fe6b05353c Add fill_responsables_by_probability: agregation ponderee des 2 Excel + contacts + domain_env
Source 1 (poids 3): Ayoub/Serveurs patchables 2026
Source 2 (poids 2): Plan de Patching/Histo-2025
Source 3 (poids 1 chacune): Plan de Patching/S02..S52 weekly
Source 4 (poids 1): fallback domain_environments

Par hostname et par role (responsable/referent), retient le nom au score max.
Canonicalise via contacts.name (case/accent-insensitive) avant ecriture.
2026-04-14 22:49:29 +02:00

217 lines
8.6 KiB
Python

"""Remplit servers.responsable_nom / referent_nom par agregation probabiliste.
Sources examinees (par priorite, poids decroissant) :
1. Planning Patching 2026_ayoub.xlsx - sheet 'Serveurs patchables 2026' (poids 3)
- Col 'Asset Name' / 'Responsable Domaine DTS' / 'Referent technique'
2. Plan de Patching serveurs 2026.xlsx - sheet 'Histo-2025' (poids 2)
- Col 'Asset Name' / 'Responsable Domaine DTS' / 'Referent technique'
3. Plan de Patching serveurs 2026.xlsx - sheets hebdo S02..S52 (poids 1 chacune)
- Col 0 (Asset Name) / Col 7 (Responsable Domaine DTS) / Col 9 (Referent technique)
4. domain_environments.responsable_nom / referent_nom (fallback, poids 1)
Algorithme par hostname :
- Agrege toutes les occurrences (name, role, weight)
- Pour chaque role (responsable / referent) : score = sum(weights)
- Retient le nom avec le score max
- Verifie existence dans contacts (case/accent-insensitive). Garde le nom canonique.
- UPDATE servers.responsable_nom / referent_nom si champ vide (sauf --overwrite)
Usage:
python tools/fill_responsables_by_probability.py [--ayoub <xlsx>] [--patching <xlsx>] [--dry-run] [--overwrite]
"""
import os
import re
import argparse
import unicodedata
from collections import defaultdict
from sqlalchemy import create_engine, text
try:
import openpyxl
except ImportError:
print("[ERR] pip install openpyxl")
raise
DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo"
def norm_name(s):
"""ASCII lowercase sans accent, espaces multiples compresses."""
if not s:
return ""
nfkd = unicodedata.normalize("NFKD", s.strip().lower())
ascii_str = "".join(c for c in nfkd if not unicodedata.combining(c))
return " ".join(ascii_str.replace("\xa0", " ").split())
def clean(v):
if v is None:
return None
s = str(v).replace("\xa0", " ").strip()
return s or None
def collect_from_sheet(ws, col_host, col_resp, col_ref, weight, scores, source_label):
"""Cumule scores dans scores[hostname][role][canonical_name] += weight"""
added = 0
for row in ws.iter_rows(min_row=2, values_only=True):
h = clean(row[col_host]) if col_host < len(row) else None
if not h:
continue
h = h.split(".")[0].lower()
if not any(c.isalpha() for c in h):
continue
if col_resp >= 0 and col_resp < len(row):
r = clean(row[col_resp])
if r:
r = re.sub(r"\s+", " ", r)
scores[h]["responsable"][r] += weight
added += 1
if col_ref >= 0 and col_ref < len(row):
r = clean(row[col_ref])
if r:
r = re.sub(r"\s+", " ", r)
scores[h]["referent"][r] += weight
added += 1
print(f" [{source_label}] +{added} observations (weight {weight})")
def resolve_header(ws, candidates):
header = [clean(c.value) or "" for c in ws[1]]
for i, h in enumerate(header):
for cand in candidates:
if cand.lower() in h.lower():
return i
return -1
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--ayoub", default=None, help="Chemin Planning Patching 2026_ayoub.xlsx")
parser.add_argument("--patching", default=None, help="Chemin Plan de Patching serveurs 2026.xlsx")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--overwrite", action="store_true")
args = parser.parse_args()
# scores[hostname][role][name] = poids cumule
scores = defaultdict(lambda: {"responsable": defaultdict(int), "referent": defaultdict(int)})
# Source 1 : Ayoub
if args.ayoub and os.path.exists(args.ayoub):
print(f"[INFO] Lecture {args.ayoub}")
wb = openpyxl.load_workbook(args.ayoub, data_only=True)
sheet = "Serveurs patchables 2026"
if sheet in wb.sheetnames:
ws = wb[sheet]
col_h = resolve_header(ws, ["Asset Name", "Hostname"])
col_r = resolve_header(ws, ["Responsable Domaine"])
col_t = resolve_header(ws, ["Referent technique", "Référent technique"])
collect_from_sheet(ws, col_h, col_r, col_t, 3, scores, "ayoub/Serveurs patchables")
# Source 2+3 : Plan de Patching (Histo-2025 + S**)
if args.patching and os.path.exists(args.patching):
print(f"[INFO] Lecture {args.patching}")
wb = openpyxl.load_workbook(args.patching, data_only=True)
# Histo-2025
for sheet in ("Histo-2025", "Histo_2025"):
if sheet in wb.sheetnames:
ws = wb[sheet]
col_h = resolve_header(ws, ["Asset Name", "Hostname", "Nom"])
col_r = resolve_header(ws, ["Responsable Domaine"])
col_t = resolve_header(ws, ["Referent technique", "Référent technique", "Administrateur"])
collect_from_sheet(ws, col_h, col_r, col_t, 2, scores, sheet)
break
# Weekly sheets
for sheet_name in wb.sheetnames:
if re.match(r"S\d{1,2}$", sheet_name, re.IGNORECASE):
ws = wb[sheet_name]
col_h = resolve_header(ws, ["Asset Name", "Hostname"])
col_r = resolve_header(ws, ["Responsable Domaine"])
col_t = resolve_header(ws, ["Referent technique", "Référent technique"])
collect_from_sheet(ws, col_h, col_r, col_t, 1, scores, sheet_name)
engine = create_engine(DATABASE_URL)
conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT")
# Source 4 : fallback domain_environments
print("[INFO] Fallback via domain_environments...")
rows = conn.execute(text("""
SELECT s.hostname, de.responsable_nom, de.referent_nom
FROM servers s
JOIN domain_environments de ON s.domain_env_id = de.id
""")).fetchall()
for r in rows:
h = r.hostname.lower()
if r.responsable_nom:
scores[h]["responsable"][r.responsable_nom] += 1
if r.referent_nom:
scores[h]["referent"][r.referent_nom] += 1
# Index contacts pour validation/canonicalisation
contacts = conn.execute(text("SELECT name FROM contacts")).fetchall()
contact_canon = {} # norm_name -> name canonique
for c in contacts:
k = norm_name(c.name)
if k and k not in contact_canon:
contact_canon[k] = c.name
def canonical(n):
"""Retourne le nom canonique (depuis contacts) ou le nom tel quel."""
k = norm_name(n)
return contact_canon.get(k, n)
# Decision finale par hostname
stats = {"updated": 0, "unchanged": 0, "no_data": 0, "no_server": 0}
detail_resp_source = defaultdict(int)
hosts_in_db = {r.hostname.lower(): r.id for r in
conn.execute(text("SELECT id, hostname, responsable_nom, referent_nom FROM servers")).fetchall()}
curr_by_host = {r.hostname.lower(): (r.responsable_nom, r.referent_nom) for r in
conn.execute(text("SELECT hostname, responsable_nom, referent_nom FROM servers")).fetchall()}
for host, roles in scores.items():
sid = hosts_in_db.get(host)
if not sid:
stats["no_server"] += 1
continue
curr_resp, curr_ref = curr_by_host.get(host, (None, None))
updates = {}
# Responsable
if roles["responsable"] and (args.overwrite or not (curr_resp or "").strip()):
best, score = max(roles["responsable"].items(), key=lambda x: x[1])
# Normalise via contacts si possible
new_name = canonical(best)[:200]
if new_name != curr_resp:
updates["responsable_nom"] = new_name
detail_resp_source[f"score={score}"] += 1
# Referent
if roles["referent"] and (args.overwrite or not (curr_ref or "").strip()):
best, score = max(roles["referent"].items(), key=lambda x: x[1])
new_name = canonical(best)[:200]
if new_name != curr_ref:
updates["referent_nom"] = new_name
if not updates:
stats["unchanged"] += 1
continue
if args.dry_run:
print(f" DRY: {host:25s} {updates}")
else:
sets = ", ".join(f"{k}=:{k}" for k in updates)
params = dict(updates); params["sid"] = sid
conn.execute(text(f"UPDATE servers SET {sets} WHERE id=:sid"), params)
stats["updated"] += 1
conn.close()
print(f"\n[DONE] Maj: {stats['updated']} | Inchanges: {stats['unchanged']} "
f"| Hors base: {stats['no_server']}")
if __name__ == "__main__":
main()