"""Import historique patching depuis Plan de Patching serveurs 2026.xlsx (SOURCE DE VERITE). Perimetre : 2025 + 2026 uniquement. - Histo-2025 (cols L/M = 1er sem, O/P = 2eme sem) - S02..S52 (weekly 2026 : nom de cellule VERT = patche) Regles : - Weekly sheets : cellule du nom (col A) AVEC FOND VERT = serveur patche - Date : col N (14) ; Heure : col O (15) - Si date manque -> lundi de la semaine (ISO) ; si heure manque -> 00:00 - La semaine est toujours derivee du nom de sheet (S02..S52) ou de date_patch Usage : python tools/import_plan_patching_xlsx.py [xlsx] [--truncate] [--dry-run] """ import os import re import sys import glob import argparse from datetime import datetime, time, date, timedelta from pathlib import Path import openpyxl from sqlalchemy import create_engine, text ROOT = Path(__file__).resolve().parent.parent DATABASE_URL = (os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db") def is_green(cell): """True si la cellule a un fond vert (dominante G > R et G > B).""" if cell.fill is None or cell.fill.fgColor is None: return False fc = cell.fill.fgColor rgb = None if fc.type == "rgb" and fc.rgb: rgb = fc.rgb.upper() elif fc.type == "theme": # Themes Office 9/6 = green-ish accents return fc.theme in (9, 6) if not rgb or len(rgb) < 6: return False try: rr = int(rgb[-6:-4], 16) gg = int(rgb[-4:-2], 16) bb = int(rgb[-2:], 16) except ValueError: return False return gg > 120 and gg > rr + 30 and gg > bb + 30 def parse_week_num(sheet_name): m = re.match(r"^[Ss](\d{1,2})$", sheet_name.strip()) return int(m.group(1)) if m else None def monday_of_iso_week(year, week): jan4 = date(year, 1, 4) start = jan4 - timedelta(days=jan4.isoweekday() - 1) + timedelta(weeks=week - 1) return start def parse_hour(val): if val is None: return None if isinstance(val, time): return val if isinstance(val, datetime): return val.time() s = str(val).strip().lower().replace("h", ":") m = re.match(r"(\d{1,2})(?::(\d{2}))?", s) if not m: return None hh = int(m.group(1)) mm = int(m.group(2) or 0) if 0 <= hh < 24 and 0 <= mm < 60: return time(hh, mm) return None def parse_date_cell(val): if val is None: return None if isinstance(val, datetime): return val if isinstance(val, date): return datetime.combine(val, time(0, 0)) s = str(val).strip() m = re.match(r"(\d{2})/(\d{2})/(\d{4})", s) if m: try: return datetime(int(m.group(3)), int(m.group(2)), int(m.group(1))) except Exception: return None return None def find_xlsx(): for p in [ ROOT / "deploy" / "Plan de Patching serveurs 2026.xlsx", ROOT / "deploy" / "Plan_de_Patching_serveurs_2026.xlsx", ]: if p.exists(): return str(p) hits = glob.glob(str(ROOT / "deploy" / "Plan*Patching*erveurs*2026*.xlsx")) return hits[0] if hits else None def collect_events(wb, hosts): """Retourne liste dicts patch_history : {sid, dt, status, notes}. 3 champs toujours renseignes : semaine (dans notes), date (date_patch::date), heure (date_patch::time — 00:00 si inconnue). """ events = [] stats = {"histo_2025_s1": 0, "histo_2025_s2": 0, "weekly": 0, "no_server": 0, "weekly_no_color": 0} # --- Histo-2025 : col B (2) Intervenant, col L (12) date S1, col M (13) flag S1, col O (15) date S2, col P (16) flag S2 if "Histo-2025" in wb.sheetnames: ws = wb["Histo-2025"] for row_idx in range(2, ws.max_row + 1): hn = ws.cell(row=row_idx, column=1).value if not hn: continue sid = hosts.get(str(hn).strip().lower()) if not sid: stats["no_server"] += 1 continue interv = ws.cell(row=row_idx, column=2).value interv = str(interv).strip() if interv else None date_s1 = parse_date_cell(ws.cell(row=row_idx, column=12).value) flag_s1 = ws.cell(row=row_idx, column=13).value if flag_s1 and isinstance(flag_s1, int) and flag_s1 >= 1: dt = date_s1 or datetime(2025, 6, 30, 0, 0) events.append({"sid": sid, "dt": dt, "status": "ok", "notes": f"Histo-2025 S1 (x{flag_s1})", "interv": interv}) stats["histo_2025_s1"] += 1 date_s2 = parse_date_cell(ws.cell(row=row_idx, column=15).value) flag_s2 = ws.cell(row=row_idx, column=16).value if flag_s2 and isinstance(flag_s2, int) and flag_s2 >= 1: dt = date_s2 or datetime(2025, 12, 31, 0, 0) events.append({"sid": sid, "dt": dt, "status": "ok", "notes": f"Histo-2025 S2 (x{flag_s2})", "interv": interv}) stats["histo_2025_s2"] += 1 # --- Weekly sheets S02..S52 : nom colore VERT = patche (2026) for sname in wb.sheetnames: wk = parse_week_num(sname) if wk is None or not (1 <= wk <= 53): continue ws = wb[sname] fallback_monday = monday_of_iso_week(2026, wk) for row_idx in range(2, ws.max_row + 1): hn_cell = ws.cell(row=row_idx, column=1) hn = hn_cell.value if not hn or not any(c.isalpha() for c in str(hn)): continue if not is_green(hn_cell): stats["weekly_no_color"] += 1 continue hn_norm = str(hn).strip().split(".")[0].lower() sid = hosts.get(hn_norm) if not sid: stats["no_server"] += 1 continue interv = ws.cell(row=row_idx, column=2).value interv = str(interv).strip() if interv else None # col N (14) = Date, col O (15) = Heure date_val = ws.cell(row=row_idx, column=14).value hour_val = ws.cell(row=row_idx, column=15).value dt_base = parse_date_cell(date_val) or datetime.combine(fallback_monday, time(0, 0)) hr = parse_hour(hour_val) if hr: dt_base = datetime.combine(dt_base.date(), hr) # sinon : heure = 00:00 par defaut (deja dans dt_base) events.append({"sid": sid, "dt": dt_base, "status": "ok", "notes": f"Semaine {wk:02d} 2026", "interv": interv}) stats["weekly"] += 1 return events, stats def main(): parser = argparse.ArgumentParser() parser.add_argument("xlsx", nargs="?", default=None) parser.add_argument("--truncate", action="store_true", help="TRUNCATE patch_history avant import (source de verite)") parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() xlsx = args.xlsx or find_xlsx() if not xlsx or not os.path.exists(xlsx): print("[ERR] Fichier Plan de Patching introuvable. Place-le dans deploy/.") sys.exit(1) print(f"[INFO] Fichier: {xlsx}") engine = create_engine(DATABASE_URL) print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}") wb = openpyxl.load_workbook(xlsx, data_only=True) print(f"[INFO] Sheets: {', '.join(wb.sheetnames)}") with engine.begin() as conn: hosts = {} for r in conn.execute(text("SELECT id, hostname FROM servers")).fetchall(): hosts[r.hostname.lower()] = r.id print(f"[INFO] Servers en DB: {len(hosts)}") events, stats = collect_events(wb, hosts) print("[INFO] Events detectes:") for k, v in stats.items(): print(f" {v:5d} {k}") print(f"[INFO] TOTAL events: {len(events)}") if args.dry_run: print("[DRY-RUN] Aucun write") return if args.truncate: print("[INFO] TRUNCATE patch_history RESTART IDENTITY CASCADE") conn.execute(text("TRUNCATE TABLE patch_history RESTART IDENTITY CASCADE")) inserted = skipped = 0 for ev in events: existing = conn.execute(text( "SELECT id FROM patch_history WHERE server_id=:sid AND date_patch=:dt" ), {"sid": ev["sid"], "dt": ev["dt"]}).fetchone() if existing: skipped += 1 continue conn.execute(text(""" INSERT INTO patch_history (server_id, date_patch, status, notes, intervenant_name) VALUES (:sid, :dt, :status, :notes, :interv) """), ev) inserted += 1 print(f"[OK] INSERT: {inserted} | SKIP (doublon): {skipped}") if __name__ == "__main__": main()