"""Router import du planning de patching depuis Excel. Fonctionnalités : - Upload xlsx (multi-feuilles, 1 feuille = 1 semaine S02..S52) - Liste des imports précédents - Affichage du contenu d'un import : sélecteur de semaine + tableau des serveurs - Endpoints JSON pour AJAX (sélection de semaine sans rechargement) Le module pré-patching et le patching by-step seront branchés en étape 2/3. """ import io import json import re from datetime import date, datetime from fastapi import APIRouter, Request, Depends, UploadFile, File, Form from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse from fastapi.templating import Jinja2Templates from sqlalchemy import text from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, base_context from ..config import APP_NAME router = APIRouter() templates = Jinja2Templates(directory="app/templates") # Colonnes attendues dans les feuilles Sxx (ordre = priorité, on matche par regex/lower) # Le fichier 2026 a 12 variantes d'en-têtes selon la semaine # (ancien format S02-S06, nouveau format DTS S07+) KNOWN_COLUMNS = { "asset_name": [r"asset\s*name", r"\bnom\b"], "intervenant": [r"intervenant"], "environnement": [r"environnement|environement"], "domaine": [r"^domaine"], "os": [r"^\s*os\s*$"], "os_version": [r"version\s*os"], # matche "Version OS" et "Version OS->Nom" "application_name": [r"logiciel", r"application", r"^nom\s*complet$"], "valideur_ra": [r"valideur"], "responsable_domaine_dts":[r"responsable\s*domaine"], "description": [r"description"], "assistant": [r"^assistant"], "referent_technique": [r"r.f.rent\s*tech"], "mode_operatoire": [r"mode\s*op.ratoire"], "impacts": [r"^impact"], "commentaire": [r"commentaire"], "base_de_donnees": [r"base\s*de\s*donn"], "duree_coupure": [r"dur.+coupure"], "jour": [r"^\s*jour\s*$", r"^\s*date\s*$", r"date\s*pr.+vis"], "heure": [r"^\s*heure"], "pb_espace_disque": [r"espace\s*disque"], "date_patch_realise": [r"date\s*du?\s*patch.+r.+alis"], } SHEET_WEEK_RE = re.compile(r"^S\s*0?(\d+)$", re.IGNORECASE) def _can_import(perms): """Droit d'importer = niveau edit/admin sur planning ou campaigns.""" return can_edit(perms, "planning") or can_edit(perms, "campaigns") def _to_iso(v): if v is None: return None if isinstance(v, (datetime, date)): return v.isoformat() return str(v) def _coerce_date(v): if v is None or v == "": return None if isinstance(v, datetime): return v.date() if isinstance(v, date): return v return None def _coerce_bool(v): if v is None or v == "": return None if isinstance(v, bool): return v s = str(v).strip().lower() if s in ("true", "vrai", "oui", "yes", "1", "x"): return True if s in ("false", "faux", "non", "no", "0"): return False return None def _norm_header(h): if h is None: return "" return re.sub(r"\s+", " ", str(h)).strip().lower() def _build_column_map(headers): """Mappe l'index de colonne → nom logique (asset_name, intervenant, ...).""" col_map = {} used_logical = set() for idx, h in enumerate(headers): norm = _norm_header(h) if not norm: continue for logical, patterns in KNOWN_COLUMNS.items(): if logical in used_logical: continue for pat in patterns: if re.search(pat, norm): col_map[idx] = logical used_logical.add(logical) break if logical in used_logical: break return col_map def _parse_sheet(ws, sheet_name): """Parse une feuille xlsx → liste de dict {logical_col: value, _raw: {header: value}}.""" rows_iter = ws.iter_rows(values_only=True) try: headers = next(rows_iter) except StopIteration: return [], [] headers = [h for h in headers] col_map = _build_column_map(headers) parsed = [] for ridx, row in enumerate(rows_iter, start=1): if row is None: continue if all(c is None or (isinstance(c, str) and not c.strip()) for c in row): continue rec = {"row_index": ridx} raw = {} for cidx, val in enumerate(row): header = headers[cidx] if cidx < len(headers) else f"col_{cidx}" header_str = _norm_header(header) or f"col_{cidx}" raw[header_str] = _to_iso(val) if cidx in col_map: rec[col_map[cidx]] = val rec["_raw"] = raw parsed.append(rec) return headers, parsed def _list_imports(db): return db.execute(text(""" SELECT i.id, i.filename, i.year, i.sheet_count, i.row_count, i.uploaded_at, u.username as uploaded_by_name FROM patch_planning_imports i LEFT JOIN users u ON u.id = i.uploaded_by ORDER BY i.uploaded_at DESC LIMIT 50 """)).fetchall() # ──────────────────────────────────────────────────────────────────────── # Pages # ──────────────────────────────────────────────────────────────────────── @router.get("/patching/import", response_class=HTMLResponse) async def import_index(request: Request, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not (can_view(perms, "planning") or can_view(perms, "campaigns")): return RedirectResponse(url="/dashboard") imports = _list_imports(db) ctx = base_context(request, db, user) ctx.update({ "app_name": APP_NAME, "imports": imports, "current_import": None, "can_import": _can_import(perms), "msg": request.query_params.get("msg"), "err": request.query_params.get("err"), }) return templates.TemplateResponse("patching_import.html", ctx) @router.get("/patching/import/{import_id}", response_class=HTMLResponse) async def import_view(request: Request, import_id: int, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not (can_view(perms, "planning") or can_view(perms, "campaigns")): return RedirectResponse(url="/dashboard") imp = db.execute(text(""" SELECT i.*, u.username as uploaded_by_name FROM patch_planning_imports i LEFT JOIN users u ON u.id = i.uploaded_by WHERE i.id = :id """), {"id": import_id}).fetchone() if not imp: return RedirectResponse(url="/patching/import?err=notfound") sheets = db.execute(text(""" SELECT sheet_name, week_number, COUNT(*) as nb FROM patch_planning_import_rows WHERE import_id = :id GROUP BY sheet_name, week_number ORDER BY week_number NULLS LAST, sheet_name """), {"id": import_id}).fetchall() imports = _list_imports(db) ctx = base_context(request, db, user) ctx.update({ "app_name": APP_NAME, "imports": imports, "current_import": imp, "sheets": sheets, "can_import": _can_import(perms), "msg": request.query_params.get("msg"), "err": request.query_params.get("err"), }) return templates.TemplateResponse("patching_import.html", ctx) # ──────────────────────────────────────────────────────────────────────── # JSON : rows d'une feuille # ──────────────────────────────────────────────────────────────────────── @router.get("/patching/import/{import_id}/sheet/{sheet_name}") async def import_sheet_json(request: Request, import_id: int, sheet_name: str, db=Depends(get_db)): user = get_current_user(request) if not user: return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401) rows = db.execute(text(""" SELECT r.id, r.row_index, r.asset_name, r.intervenant, r.environnement, r.domaine, r.os, r.os_version, r.application_name, r.valideur_ra, r.responsable_domaine_dts, r.description, r.assistant, r.referent_technique, r.mode_operatoire, r.impacts, r.commentaire, r.base_de_donnees, r.duree_coupure, r.jour, r.heure, r.pb_espace_disque, r.date_patch_realise, r.server_id, s.hostname as resolved_hostname FROM patch_planning_import_rows r LEFT JOIN servers s ON s.id = r.server_id WHERE r.import_id = :id AND r.sheet_name = :sn ORDER BY r.row_index """), {"id": import_id, "sn": sheet_name}).fetchall() out = [] for r in rows: out.append({ "id": r.id, "row_index": r.row_index, "asset_name": r.asset_name, "intervenant": r.intervenant, "environnement": r.environnement, "domaine": r.domaine, "os": r.os, "os_version": r.os_version, "application_name": r.application_name, "valideur_ra": r.valideur_ra, "responsable_domaine_dts": r.responsable_domaine_dts, "description": r.description, "assistant": r.assistant, "referent_technique": r.referent_technique, "mode_operatoire": r.mode_operatoire, "impacts": r.impacts, "commentaire": r.commentaire, "base_de_donnees": r.base_de_donnees, "duree_coupure": r.duree_coupure, "jour": r.jour.isoformat() if r.jour else None, "heure": r.heure, "pb_espace_disque": r.pb_espace_disque, "date_patch_realise": r.date_patch_realise.isoformat() if r.date_patch_realise else None, "server_id": r.server_id, "resolved_hostname": r.resolved_hostname, }) return JSONResponse({"ok": True, "rows": out, "count": len(out)}) # ──────────────────────────────────────────────────────────────────────── # Upload # ──────────────────────────────────────────────────────────────────────── @router.post("/patching/import/upload") async def import_upload(request: Request, db=Depends(get_db), file: UploadFile = File(...), note: str = Form("")): user = get_current_user(request) if not user: return RedirectResponse(url="/login", status_code=303) perms = get_user_perms(db, user) if not _can_import(perms): return RedirectResponse(url="/patching/import?err=denied", status_code=303) fname = file.filename or "import.xlsx" if not fname.lower().endswith(".xlsx"): return RedirectResponse(url="/patching/import?err=ext", status_code=303) try: import openpyxl except ImportError: return RedirectResponse(url="/patching/import?err=openpyxl_missing", status_code=303) content = await file.read() try: wb = openpyxl.load_workbook(io.BytesIO(content), read_only=True, data_only=True) except Exception as e: print(f"[import_upload] load_workbook failed: {e}") return RedirectResponse(url="/patching/import?err=parse", status_code=303) # Détecter l'année (depuis nom de fichier ou colonne 'jour' de la 1ère feuille semaine) year_match = re.search(r"(20\d{2})", fname) year = int(year_match.group(1)) if year_match else None # Insert header db.execute(text(""" INSERT INTO patch_planning_imports (filename, year, sheet_count, row_count, uploaded_by, note) VALUES (:fn, :y, 0, 0, :uid, :nt) """), {"fn": fname, "y": year, "uid": user.get("uid"), "nt": note or None}) db.commit() import_id = db.execute(text("SELECT lastval()")).scalar() sheet_count = 0 row_count = 0 # Pré-charge mapping hostname → server_id pour résolution hostname_map = {} for r in db.execute(text("SELECT id, hostname FROM servers")).fetchall(): if r.hostname: hostname_map[r.hostname.lower().strip()] = r.id for sheet_name in wb.sheetnames: m = SHEET_WEEK_RE.match(sheet_name.strip()) if not m: # On ignore les feuilles "Histo-XXX" et autres non-semaines continue week_num = int(m.group(1)) ws = wb[sheet_name] _, parsed_rows = _parse_sheet(ws, sheet_name) if not parsed_rows: continue sheet_count += 1 for rec in parsed_rows: asset = rec.get("asset_name") asset_str = str(asset).strip() if asset else None if not asset_str: continue sid = hostname_map.get(asset_str.lower()) db.execute(text(""" INSERT INTO patch_planning_import_rows ( import_id, sheet_name, week_number, row_index, asset_name, intervenant, environnement, domaine, os, os_version, application_name, valideur_ra, responsable_domaine_dts, description, assistant, referent_technique, mode_operatoire, impacts, commentaire, base_de_donnees, duree_coupure, jour, heure, pb_espace_disque, date_patch_realise, raw_data, server_id ) VALUES ( :imp, :sn, :wn, :ri, :an, :it, :en, :do, :os, :ov, :ap, :vr, :rd, :de, :as_, :rt, :mo, :im, :co, :bdd, :dc, :jr, :hr, :pb, :dpr, :raw, :sid ) """), { "imp": import_id, "sn": sheet_name, "wn": week_num, "ri": rec["row_index"], "an": asset_str, "it": str(rec.get("intervenant")) if rec.get("intervenant") else None, "en": str(rec.get("environnement")) if rec.get("environnement") else None, "do": str(rec.get("domaine")) if rec.get("domaine") else None, "os": str(rec.get("os")) if rec.get("os") else None, "ov": str(rec.get("os_version")) if rec.get("os_version") else None, "ap": str(rec.get("application_name")) if rec.get("application_name") else None, "vr": str(rec.get("valideur_ra")) if rec.get("valideur_ra") else None, "rd": str(rec.get("responsable_domaine_dts")) if rec.get("responsable_domaine_dts") else None, "de": str(rec.get("description")) if rec.get("description") else None, "as_": str(rec.get("assistant")) if rec.get("assistant") else None, "rt": str(rec.get("referent_technique")) if rec.get("referent_technique") else None, "mo": str(rec.get("mode_operatoire")) if rec.get("mode_operatoire") else None, "im": str(rec.get("impacts")) if rec.get("impacts") else None, "co": str(rec.get("commentaire")) if rec.get("commentaire") else None, "bdd": str(rec.get("base_de_donnees")) if rec.get("base_de_donnees") else None, "dc": str(rec.get("duree_coupure")) if rec.get("duree_coupure") else None, "jr": _coerce_date(rec.get("jour")), "hr": str(rec.get("heure")) if rec.get("heure") else None, "pb": _coerce_bool(rec.get("pb_espace_disque")), "dpr": _coerce_date(rec.get("date_patch_realise")), "raw": json.dumps(rec.get("_raw") or {}, ensure_ascii=False, default=str), "sid": sid, }) row_count += 1 db.execute(text(""" UPDATE patch_planning_imports SET sheet_count=:s, row_count=:r WHERE id=:id """), {"s": sheet_count, "r": row_count, "id": import_id}) db.commit() return RedirectResponse(url=f"/patching/import/{import_id}?msg=ok", status_code=303) # ──────────────────────────────────────────────────────────────────────── # Suppression # ──────────────────────────────────────────────────────────────────────── @router.post("/patching/import/{import_id}/delete") async def import_delete(request: Request, import_id: int, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login", status_code=303) perms = get_user_perms(db, user) if not _can_import(perms): return RedirectResponse(url="/patching/import?err=denied", status_code=303) db.execute(text("DELETE FROM patch_planning_imports WHERE id=:id"), {"id": import_id}) db.commit() return RedirectResponse(url="/patching/import?msg=deleted", status_code=303)