Add tool import_plan_patching_xlsx : historique 2025+2026 (vert = patche)
This commit is contained in:
parent
cfb9cf865c
commit
14f809335e
BIN
deploy/Plan de Patching serveurs 2026.xlsx
Normal file
BIN
deploy/Plan de Patching serveurs 2026.xlsx
Normal file
Binary file not shown.
247
tools/import_plan_patching_xlsx.py
Normal file
247
tools/import_plan_patching_xlsx.py
Normal file
@ -0,0 +1,247 @@
|
||||
"""Import historique patching depuis Plan de Patching serveurs 2026.xlsx (SOURCE DE VERITE).
|
||||
|
||||
Perimetre : 2025 + 2026 uniquement.
|
||||
- Histo-2025 (cols L/M = 1er sem, O/P = 2eme sem)
|
||||
- S02..S52 (weekly 2026 : nom de cellule VERT = patche)
|
||||
|
||||
Regles :
|
||||
- Weekly sheets : cellule du nom (col A) AVEC FOND VERT = serveur patche
|
||||
- Date : col N (14) ; Heure : col O (15)
|
||||
- Si date manque -> lundi de la semaine (ISO) ; si heure manque -> 00:00
|
||||
- La semaine est toujours derivee du nom de sheet (S02..S52) ou de date_patch
|
||||
|
||||
Usage :
|
||||
python tools/import_plan_patching_xlsx.py [xlsx] [--truncate] [--dry-run]
|
||||
"""
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import glob
|
||||
import argparse
|
||||
from datetime import datetime, time, date, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
import openpyxl
|
||||
from sqlalchemy import create_engine, text
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
|
||||
or os.getenv("DATABASE_URL")
|
||||
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
|
||||
|
||||
|
||||
def is_green(cell):
|
||||
"""True si la cellule a un fond vert (dominante G > R et G > B)."""
|
||||
if cell.fill is None or cell.fill.fgColor is None:
|
||||
return False
|
||||
fc = cell.fill.fgColor
|
||||
rgb = None
|
||||
if fc.type == "rgb" and fc.rgb:
|
||||
rgb = fc.rgb.upper()
|
||||
elif fc.type == "theme":
|
||||
# Themes Office 9/6 = green-ish accents
|
||||
return fc.theme in (9, 6)
|
||||
if not rgb or len(rgb) < 6:
|
||||
return False
|
||||
try:
|
||||
rr = int(rgb[-6:-4], 16)
|
||||
gg = int(rgb[-4:-2], 16)
|
||||
bb = int(rgb[-2:], 16)
|
||||
except ValueError:
|
||||
return False
|
||||
return gg > 120 and gg > rr + 30 and gg > bb + 30
|
||||
|
||||
|
||||
def parse_week_num(sheet_name):
|
||||
m = re.match(r"^[Ss](\d{1,2})$", sheet_name.strip())
|
||||
return int(m.group(1)) if m else None
|
||||
|
||||
|
||||
def monday_of_iso_week(year, week):
|
||||
jan4 = date(year, 1, 4)
|
||||
start = jan4 - timedelta(days=jan4.isoweekday() - 1) + timedelta(weeks=week - 1)
|
||||
return start
|
||||
|
||||
|
||||
def parse_hour(val):
|
||||
if val is None:
|
||||
return None
|
||||
if isinstance(val, time):
|
||||
return val
|
||||
if isinstance(val, datetime):
|
||||
return val.time()
|
||||
s = str(val).strip().lower().replace("h", ":")
|
||||
m = re.match(r"(\d{1,2})(?::(\d{2}))?", s)
|
||||
if not m:
|
||||
return None
|
||||
hh = int(m.group(1))
|
||||
mm = int(m.group(2) or 0)
|
||||
if 0 <= hh < 24 and 0 <= mm < 60:
|
||||
return time(hh, mm)
|
||||
return None
|
||||
|
||||
|
||||
def parse_date_cell(val):
|
||||
if val is None:
|
||||
return None
|
||||
if isinstance(val, datetime):
|
||||
return val
|
||||
if isinstance(val, date):
|
||||
return datetime.combine(val, time(0, 0))
|
||||
s = str(val).strip()
|
||||
m = re.match(r"(\d{2})/(\d{2})/(\d{4})", s)
|
||||
if m:
|
||||
try:
|
||||
return datetime(int(m.group(3)), int(m.group(2)), int(m.group(1)))
|
||||
except Exception:
|
||||
return None
|
||||
return None
|
||||
|
||||
|
||||
def find_xlsx():
|
||||
for p in [
|
||||
ROOT / "deploy" / "Plan de Patching serveurs 2026.xlsx",
|
||||
ROOT / "deploy" / "Plan_de_Patching_serveurs_2026.xlsx",
|
||||
]:
|
||||
if p.exists():
|
||||
return str(p)
|
||||
hits = glob.glob(str(ROOT / "deploy" / "Plan*Patching*erveurs*2026*.xlsx"))
|
||||
return hits[0] if hits else None
|
||||
|
||||
|
||||
def collect_events(wb, hosts):
|
||||
"""Retourne liste dicts patch_history : {sid, dt, status, notes}.
|
||||
|
||||
3 champs toujours renseignes : semaine (dans notes), date (date_patch::date),
|
||||
heure (date_patch::time — 00:00 si inconnue).
|
||||
"""
|
||||
events = []
|
||||
stats = {"histo_2025_s1": 0, "histo_2025_s2": 0,
|
||||
"weekly": 0, "no_server": 0, "weekly_no_color": 0}
|
||||
|
||||
# --- Histo-2025 : col L (12) date S1, col M (13) flag S1, col O (15) date S2, col P (16) flag S2
|
||||
if "Histo-2025" in wb.sheetnames:
|
||||
ws = wb["Histo-2025"]
|
||||
for row_idx in range(2, ws.max_row + 1):
|
||||
hn = ws.cell(row=row_idx, column=1).value
|
||||
if not hn:
|
||||
continue
|
||||
sid = hosts.get(str(hn).strip().lower())
|
||||
if not sid:
|
||||
stats["no_server"] += 1
|
||||
continue
|
||||
|
||||
date_s1 = parse_date_cell(ws.cell(row=row_idx, column=12).value)
|
||||
flag_s1 = ws.cell(row=row_idx, column=13).value
|
||||
if flag_s1 and isinstance(flag_s1, int) and flag_s1 >= 1:
|
||||
dt = date_s1 or datetime(2025, 6, 30, 0, 0)
|
||||
events.append({"sid": sid, "dt": dt, "status": "ok",
|
||||
"notes": f"Histo-2025 S1 (x{flag_s1})"})
|
||||
stats["histo_2025_s1"] += 1
|
||||
|
||||
date_s2 = parse_date_cell(ws.cell(row=row_idx, column=15).value)
|
||||
flag_s2 = ws.cell(row=row_idx, column=16).value
|
||||
if flag_s2 and isinstance(flag_s2, int) and flag_s2 >= 1:
|
||||
dt = date_s2 or datetime(2025, 12, 31, 0, 0)
|
||||
events.append({"sid": sid, "dt": dt, "status": "ok",
|
||||
"notes": f"Histo-2025 S2 (x{flag_s2})"})
|
||||
stats["histo_2025_s2"] += 1
|
||||
|
||||
# --- Weekly sheets S02..S52 : nom colore VERT = patche (2026)
|
||||
for sname in wb.sheetnames:
|
||||
wk = parse_week_num(sname)
|
||||
if wk is None or not (1 <= wk <= 53):
|
||||
continue
|
||||
ws = wb[sname]
|
||||
fallback_monday = monday_of_iso_week(2026, wk)
|
||||
|
||||
for row_idx in range(2, ws.max_row + 1):
|
||||
hn_cell = ws.cell(row=row_idx, column=1)
|
||||
hn = hn_cell.value
|
||||
if not hn or not any(c.isalpha() for c in str(hn)):
|
||||
continue
|
||||
if not is_green(hn_cell):
|
||||
stats["weekly_no_color"] += 1
|
||||
continue
|
||||
|
||||
hn_norm = str(hn).strip().split(".")[0].lower()
|
||||
sid = hosts.get(hn_norm)
|
||||
if not sid:
|
||||
stats["no_server"] += 1
|
||||
continue
|
||||
|
||||
# col N (14) = Date, col O (15) = Heure
|
||||
date_val = ws.cell(row=row_idx, column=14).value
|
||||
hour_val = ws.cell(row=row_idx, column=15).value
|
||||
dt_base = parse_date_cell(date_val) or datetime.combine(fallback_monday, time(0, 0))
|
||||
hr = parse_hour(hour_val)
|
||||
if hr:
|
||||
dt_base = datetime.combine(dt_base.date(), hr)
|
||||
# sinon : heure = 00:00 par defaut (deja dans dt_base)
|
||||
|
||||
events.append({"sid": sid, "dt": dt_base, "status": "ok",
|
||||
"notes": f"Semaine {wk:02d} 2026"})
|
||||
stats["weekly"] += 1
|
||||
|
||||
return events, stats
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("xlsx", nargs="?", default=None)
|
||||
parser.add_argument("--truncate", action="store_true",
|
||||
help="TRUNCATE patch_history avant import (source de verite)")
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
xlsx = args.xlsx or find_xlsx()
|
||||
if not xlsx or not os.path.exists(xlsx):
|
||||
print("[ERR] Fichier Plan de Patching introuvable. Place-le dans deploy/.")
|
||||
sys.exit(1)
|
||||
|
||||
print(f"[INFO] Fichier: {xlsx}")
|
||||
engine = create_engine(DATABASE_URL)
|
||||
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
|
||||
|
||||
wb = openpyxl.load_workbook(xlsx, data_only=True)
|
||||
print(f"[INFO] Sheets: {', '.join(wb.sheetnames)}")
|
||||
|
||||
with engine.begin() as conn:
|
||||
hosts = {}
|
||||
for r in conn.execute(text("SELECT id, hostname FROM servers")).fetchall():
|
||||
hosts[r.hostname.lower()] = r.id
|
||||
print(f"[INFO] Servers en DB: {len(hosts)}")
|
||||
|
||||
events, stats = collect_events(wb, hosts)
|
||||
print("[INFO] Events detectes:")
|
||||
for k, v in stats.items():
|
||||
print(f" {v:5d} {k}")
|
||||
print(f"[INFO] TOTAL events: {len(events)}")
|
||||
|
||||
if args.dry_run:
|
||||
print("[DRY-RUN] Aucun write")
|
||||
return
|
||||
|
||||
if args.truncate:
|
||||
print("[INFO] TRUNCATE patch_history RESTART IDENTITY CASCADE")
|
||||
conn.execute(text("TRUNCATE TABLE patch_history RESTART IDENTITY CASCADE"))
|
||||
|
||||
inserted = skipped = 0
|
||||
for ev in events:
|
||||
existing = conn.execute(text(
|
||||
"SELECT id FROM patch_history WHERE server_id=:sid AND date_patch=:dt"
|
||||
), {"sid": ev["sid"], "dt": ev["dt"]}).fetchone()
|
||||
if existing:
|
||||
skipped += 1
|
||||
continue
|
||||
conn.execute(text("""
|
||||
INSERT INTO patch_history (server_id, date_patch, status, notes)
|
||||
VALUES (:sid, :dt, :status, :notes)
|
||||
"""), ev)
|
||||
inserted += 1
|
||||
|
||||
print(f"[OK] INSERT: {inserted} | SKIP (doublon): {skipped}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in New Issue
Block a user