diff --git a/deploy/Plan de Patching serveurs 2026.xlsx b/deploy/Plan de Patching serveurs 2026.xlsx new file mode 100644 index 0000000..2435783 Binary files /dev/null and b/deploy/Plan de Patching serveurs 2026.xlsx differ diff --git a/tools/import_plan_patching_xlsx.py b/tools/import_plan_patching_xlsx.py new file mode 100644 index 0000000..f5fca1d --- /dev/null +++ b/tools/import_plan_patching_xlsx.py @@ -0,0 +1,247 @@ +"""Import historique patching depuis Plan de Patching serveurs 2026.xlsx (SOURCE DE VERITE). + +Perimetre : 2025 + 2026 uniquement. + - Histo-2025 (cols L/M = 1er sem, O/P = 2eme sem) + - S02..S52 (weekly 2026 : nom de cellule VERT = patche) + +Regles : + - Weekly sheets : cellule du nom (col A) AVEC FOND VERT = serveur patche + - Date : col N (14) ; Heure : col O (15) + - Si date manque -> lundi de la semaine (ISO) ; si heure manque -> 00:00 + - La semaine est toujours derivee du nom de sheet (S02..S52) ou de date_patch + +Usage : + python tools/import_plan_patching_xlsx.py [xlsx] [--truncate] [--dry-run] +""" +import os +import re +import sys +import glob +import argparse +from datetime import datetime, time, date, timedelta +from pathlib import Path + +import openpyxl +from sqlalchemy import create_engine, text + +ROOT = Path(__file__).resolve().parent.parent +DATABASE_URL = (os.getenv("DATABASE_URL_DEMO") + or os.getenv("DATABASE_URL") + or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db") + + +def is_green(cell): + """True si la cellule a un fond vert (dominante G > R et G > B).""" + if cell.fill is None or cell.fill.fgColor is None: + return False + fc = cell.fill.fgColor + rgb = None + if fc.type == "rgb" and fc.rgb: + rgb = fc.rgb.upper() + elif fc.type == "theme": + # Themes Office 9/6 = green-ish accents + return fc.theme in (9, 6) + if not rgb or len(rgb) < 6: + return False + try: + rr = int(rgb[-6:-4], 16) + gg = int(rgb[-4:-2], 16) + bb = int(rgb[-2:], 16) + except ValueError: + return False + return gg > 120 and gg > rr + 30 and gg > bb + 30 + + +def parse_week_num(sheet_name): + m = re.match(r"^[Ss](\d{1,2})$", sheet_name.strip()) + return int(m.group(1)) if m else None + + +def monday_of_iso_week(year, week): + jan4 = date(year, 1, 4) + start = jan4 - timedelta(days=jan4.isoweekday() - 1) + timedelta(weeks=week - 1) + return start + + +def parse_hour(val): + if val is None: + return None + if isinstance(val, time): + return val + if isinstance(val, datetime): + return val.time() + s = str(val).strip().lower().replace("h", ":") + m = re.match(r"(\d{1,2})(?::(\d{2}))?", s) + if not m: + return None + hh = int(m.group(1)) + mm = int(m.group(2) or 0) + if 0 <= hh < 24 and 0 <= mm < 60: + return time(hh, mm) + return None + + +def parse_date_cell(val): + if val is None: + return None + if isinstance(val, datetime): + return val + if isinstance(val, date): + return datetime.combine(val, time(0, 0)) + s = str(val).strip() + m = re.match(r"(\d{2})/(\d{2})/(\d{4})", s) + if m: + try: + return datetime(int(m.group(3)), int(m.group(2)), int(m.group(1))) + except Exception: + return None + return None + + +def find_xlsx(): + for p in [ + ROOT / "deploy" / "Plan de Patching serveurs 2026.xlsx", + ROOT / "deploy" / "Plan_de_Patching_serveurs_2026.xlsx", + ]: + if p.exists(): + return str(p) + hits = glob.glob(str(ROOT / "deploy" / "Plan*Patching*erveurs*2026*.xlsx")) + return hits[0] if hits else None + + +def collect_events(wb, hosts): + """Retourne liste dicts patch_history : {sid, dt, status, notes}. + + 3 champs toujours renseignes : semaine (dans notes), date (date_patch::date), + heure (date_patch::time — 00:00 si inconnue). + """ + events = [] + stats = {"histo_2025_s1": 0, "histo_2025_s2": 0, + "weekly": 0, "no_server": 0, "weekly_no_color": 0} + + # --- Histo-2025 : col L (12) date S1, col M (13) flag S1, col O (15) date S2, col P (16) flag S2 + if "Histo-2025" in wb.sheetnames: + ws = wb["Histo-2025"] + for row_idx in range(2, ws.max_row + 1): + hn = ws.cell(row=row_idx, column=1).value + if not hn: + continue + sid = hosts.get(str(hn).strip().lower()) + if not sid: + stats["no_server"] += 1 + continue + + date_s1 = parse_date_cell(ws.cell(row=row_idx, column=12).value) + flag_s1 = ws.cell(row=row_idx, column=13).value + if flag_s1 and isinstance(flag_s1, int) and flag_s1 >= 1: + dt = date_s1 or datetime(2025, 6, 30, 0, 0) + events.append({"sid": sid, "dt": dt, "status": "ok", + "notes": f"Histo-2025 S1 (x{flag_s1})"}) + stats["histo_2025_s1"] += 1 + + date_s2 = parse_date_cell(ws.cell(row=row_idx, column=15).value) + flag_s2 = ws.cell(row=row_idx, column=16).value + if flag_s2 and isinstance(flag_s2, int) and flag_s2 >= 1: + dt = date_s2 or datetime(2025, 12, 31, 0, 0) + events.append({"sid": sid, "dt": dt, "status": "ok", + "notes": f"Histo-2025 S2 (x{flag_s2})"}) + stats["histo_2025_s2"] += 1 + + # --- Weekly sheets S02..S52 : nom colore VERT = patche (2026) + for sname in wb.sheetnames: + wk = parse_week_num(sname) + if wk is None or not (1 <= wk <= 53): + continue + ws = wb[sname] + fallback_monday = monday_of_iso_week(2026, wk) + + for row_idx in range(2, ws.max_row + 1): + hn_cell = ws.cell(row=row_idx, column=1) + hn = hn_cell.value + if not hn or not any(c.isalpha() for c in str(hn)): + continue + if not is_green(hn_cell): + stats["weekly_no_color"] += 1 + continue + + hn_norm = str(hn).strip().split(".")[0].lower() + sid = hosts.get(hn_norm) + if not sid: + stats["no_server"] += 1 + continue + + # col N (14) = Date, col O (15) = Heure + date_val = ws.cell(row=row_idx, column=14).value + hour_val = ws.cell(row=row_idx, column=15).value + dt_base = parse_date_cell(date_val) or datetime.combine(fallback_monday, time(0, 0)) + hr = parse_hour(hour_val) + if hr: + dt_base = datetime.combine(dt_base.date(), hr) + # sinon : heure = 00:00 par defaut (deja dans dt_base) + + events.append({"sid": sid, "dt": dt_base, "status": "ok", + "notes": f"Semaine {wk:02d} 2026"}) + stats["weekly"] += 1 + + return events, stats + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("xlsx", nargs="?", default=None) + parser.add_argument("--truncate", action="store_true", + help="TRUNCATE patch_history avant import (source de verite)") + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + xlsx = args.xlsx or find_xlsx() + if not xlsx or not os.path.exists(xlsx): + print("[ERR] Fichier Plan de Patching introuvable. Place-le dans deploy/.") + sys.exit(1) + + print(f"[INFO] Fichier: {xlsx}") + engine = create_engine(DATABASE_URL) + print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}") + + wb = openpyxl.load_workbook(xlsx, data_only=True) + print(f"[INFO] Sheets: {', '.join(wb.sheetnames)}") + + with engine.begin() as conn: + hosts = {} + for r in conn.execute(text("SELECT id, hostname FROM servers")).fetchall(): + hosts[r.hostname.lower()] = r.id + print(f"[INFO] Servers en DB: {len(hosts)}") + + events, stats = collect_events(wb, hosts) + print("[INFO] Events detectes:") + for k, v in stats.items(): + print(f" {v:5d} {k}") + print(f"[INFO] TOTAL events: {len(events)}") + + if args.dry_run: + print("[DRY-RUN] Aucun write") + return + + if args.truncate: + print("[INFO] TRUNCATE patch_history RESTART IDENTITY CASCADE") + conn.execute(text("TRUNCATE TABLE patch_history RESTART IDENTITY CASCADE")) + + inserted = skipped = 0 + for ev in events: + existing = conn.execute(text( + "SELECT id FROM patch_history WHERE server_id=:sid AND date_patch=:dt" + ), {"sid": ev["sid"], "dt": ev["dt"]}).fetchone() + if existing: + skipped += 1 + continue + conn.execute(text(""" + INSERT INTO patch_history (server_id, date_patch, status, notes) + VALUES (:sid, :dt, :status, :notes) + """), ev) + inserted += 1 + + print(f"[OK] INSERT: {inserted} | SKIP (doublon): {skipped}") + + +if __name__ == "__main__": + main()