patchcenter/tools/import_plan_patching_xlsx.py

276 lines
9.3 KiB
Python

"""Import historique patching depuis Plan de Patching serveurs 2026.xlsx (SOURCE DE VERITE).
Perimetre : 2025 + 2026 uniquement.
- Histo-2025 (cols L/M = 1er sem, O/P = 2eme sem)
- S02..S52 (weekly 2026 : nom de cellule VERT = patche)
Regles :
- Weekly sheets : cellule du nom (col A) AVEC FOND VERT = serveur patche
- Date : col N (14) ; Heure : col O (15)
- Si date manque -> lundi de la semaine (ISO) ; si heure manque -> 00:00
- La semaine est toujours derivee du nom de sheet (S02..S52) ou de date_patch
Usage :
python tools/import_plan_patching_xlsx.py [xlsx] [--truncate] [--dry-run]
"""
import os
import re
import sys
import glob
import argparse
from datetime import datetime, time, date, timedelta
from pathlib import Path
import openpyxl
from sqlalchemy import create_engine, text
ROOT = Path(__file__).resolve().parent.parent
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
or os.getenv("DATABASE_URL")
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
# Normalisation des noms d'intervenants (xlsx libre -> canonique)
INTERVENANT_MAP = {
"sophie/joel": "Joel",
"joel/sophie": "Joel",
}
def normalize_intervenant(name):
if not name:
return None
s = str(name).strip()
return INTERVENANT_MAP.get(s.lower(), s)
def is_green(cell):
"""True si la cellule a un fond vert (dominante G > R et G > B)."""
if cell.fill is None or cell.fill.fgColor is None:
return False
fc = cell.fill.fgColor
rgb = None
if fc.type == "rgb" and fc.rgb:
rgb = fc.rgb.upper()
elif fc.type == "theme":
# Themes Office 9/6 = green-ish accents
return fc.theme in (9, 6)
if not rgb or len(rgb) < 6:
return False
try:
rr = int(rgb[-6:-4], 16)
gg = int(rgb[-4:-2], 16)
bb = int(rgb[-2:], 16)
except ValueError:
return False
return gg > 120 and gg > rr + 30 and gg > bb + 30
def parse_week_num(sheet_name):
m = re.match(r"^[Ss](\d{1,2})$", sheet_name.strip())
return int(m.group(1)) if m else None
def monday_of_iso_week(year, week):
jan4 = date(year, 1, 4)
start = jan4 - timedelta(days=jan4.isoweekday() - 1) + timedelta(weeks=week - 1)
return start
def parse_hour(val):
if val is None:
return None
if isinstance(val, time):
return val
if isinstance(val, datetime):
return val.time()
s = str(val).strip().lower().replace("h", ":")
m = re.match(r"(\d{1,2})(?::(\d{2}))?", s)
if not m:
return None
hh = int(m.group(1))
mm = int(m.group(2) or 0)
if 0 <= hh < 24 and 0 <= mm < 60:
return time(hh, mm)
return None
def parse_date_cell(val):
if val is None:
return None
if isinstance(val, datetime):
return val
if isinstance(val, date):
return datetime.combine(val, time(0, 0))
s = str(val).strip()
m = re.match(r"(\d{2})/(\d{2})/(\d{4})", s)
if m:
try:
return datetime(int(m.group(3)), int(m.group(2)), int(m.group(1)))
except Exception:
return None
return None
def find_xlsx():
for p in [
ROOT / "deploy" / "Plan de Patching serveurs 2026.xlsx",
ROOT / "deploy" / "Plan_de_Patching_serveurs_2026.xlsx",
]:
if p.exists():
return str(p)
hits = glob.glob(str(ROOT / "deploy" / "Plan*Patching*erveurs*2026*.xlsx"))
return hits[0] if hits else None
def collect_events(wb, hosts):
"""Retourne liste dicts patch_history : {sid, dt, status, notes}.
3 champs toujours renseignes : semaine (dans notes), date (date_patch::date),
heure (date_patch::time — 00:00 si inconnue).
"""
events = []
stats = {"histo_2025_s1": 0, "histo_2025_s2": 0,
"weekly": 0, "no_server": 0, "weekly_no_color": 0}
# --- Histo-2025 : col B (2) Intervenant, col L (12) date S1, col M (13) flag S1, col O (15) date S2, col P (16) flag S2
if "Histo-2025" in wb.sheetnames:
ws = wb["Histo-2025"]
for row_idx in range(2, ws.max_row + 1):
hn = ws.cell(row=row_idx, column=1).value
if not hn:
continue
sid = hosts.get(str(hn).strip().lower())
if not sid:
stats["no_server"] += 1
continue
interv = ws.cell(row=row_idx, column=2).value
interv = str(interv).strip() if interv else None
date_s1 = parse_date_cell(ws.cell(row=row_idx, column=12).value)
flag_s1 = ws.cell(row=row_idx, column=13).value
if flag_s1 and isinstance(flag_s1, int) and flag_s1 >= 1:
dt = date_s1 or datetime(2025, 6, 30, 0, 0)
events.append({"sid": sid, "dt": dt, "status": "ok",
"notes": f"Histo-2025 S1 (x{flag_s1})",
"interv": interv})
stats["histo_2025_s1"] += 1
date_s2 = parse_date_cell(ws.cell(row=row_idx, column=15).value)
flag_s2 = ws.cell(row=row_idx, column=16).value
if flag_s2 and isinstance(flag_s2, int) and flag_s2 >= 1:
dt = date_s2 or datetime(2025, 12, 31, 0, 0)
events.append({"sid": sid, "dt": dt, "status": "ok",
"notes": f"Histo-2025 S2 (x{flag_s2})",
"interv": interv})
stats["histo_2025_s2"] += 1
# --- Weekly sheets S02..S52 : nom colore VERT = patche (2026)
for sname in wb.sheetnames:
wk = parse_week_num(sname)
if wk is None or not (1 <= wk <= 53):
continue
ws = wb[sname]
fallback_monday = monday_of_iso_week(2026, wk)
for row_idx in range(2, ws.max_row + 1):
hn_cell = ws.cell(row=row_idx, column=1)
hn = hn_cell.value
if not hn or not any(c.isalpha() for c in str(hn)):
continue
if not is_green(hn_cell):
stats["weekly_no_color"] += 1
continue
hn_norm = str(hn).strip().split(".")[0].lower()
sid = hosts.get(hn_norm)
if not sid:
stats["no_server"] += 1
continue
interv = ws.cell(row=row_idx, column=2).value
interv = str(interv).strip() if interv else None
# col N (14) = Date, col O (15) = Heure
date_val = ws.cell(row=row_idx, column=14).value
hour_val = ws.cell(row=row_idx, column=15).value
dt_base = parse_date_cell(date_val) or datetime.combine(fallback_monday, time(0, 0))
hr = parse_hour(hour_val)
if hr:
dt_base = datetime.combine(dt_base.date(), hr)
# sinon : heure = 00:00 par defaut (deja dans dt_base)
# Skip si date de patch dans le futur (cellule coloree en avance)
if dt_base > datetime.now():
stats["weekly_future"] = stats.get("weekly_future", 0) + 1
continue
events.append({"sid": sid, "dt": dt_base, "status": "ok",
"notes": f"Semaine {wk:02d} 2026",
"interv": interv})
stats["weekly"] += 1
return events, stats
def main():
parser = argparse.ArgumentParser()
parser.add_argument("xlsx", nargs="?", default=None)
parser.add_argument("--truncate", action="store_true",
help="TRUNCATE patch_history avant import (source de verite)")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
xlsx = args.xlsx or find_xlsx()
if not xlsx or not os.path.exists(xlsx):
print("[ERR] Fichier Plan de Patching introuvable. Place-le dans deploy/.")
sys.exit(1)
print(f"[INFO] Fichier: {xlsx}")
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
wb = openpyxl.load_workbook(xlsx, data_only=True)
print(f"[INFO] Sheets: {', '.join(wb.sheetnames)}")
with engine.begin() as conn:
hosts = {}
for r in conn.execute(text("SELECT id, hostname FROM servers")).fetchall():
hosts[r.hostname.lower()] = r.id
print(f"[INFO] Servers en DB: {len(hosts)}")
events, stats = collect_events(wb, hosts)
print("[INFO] Events detectes:")
for k, v in stats.items():
print(f" {v:5d} {k}")
print(f"[INFO] TOTAL events: {len(events)}")
if args.dry_run:
print("[DRY-RUN] Aucun write")
return
if args.truncate:
print("[INFO] TRUNCATE patch_history RESTART IDENTITY CASCADE")
conn.execute(text("TRUNCATE TABLE patch_history RESTART IDENTITY CASCADE"))
inserted = skipped = 0
for ev in events:
existing = conn.execute(text(
"SELECT id FROM patch_history WHERE server_id=:sid AND date_patch=:dt"
), {"sid": ev["sid"], "dt": ev["dt"]}).fetchone()
if existing:
skipped += 1
continue
conn.execute(text("""
INSERT INTO patch_history (server_id, date_patch, status, notes, intervenant_name)
VALUES (:sid, :dt, :status, :notes, :interv)
"""), ev)
inserted += 1
print(f"[OK] INSERT: {inserted} | SKIP (doublon): {skipped}")
if __name__ == "__main__":
main()