498 lines
20 KiB
Python
498 lines
20 KiB
Python
"""Router import du planning de patching depuis Excel.
|
|
|
|
Fonctionnalités :
|
|
- Upload xlsx (multi-feuilles, 1 feuille = 1 semaine S02..S52)
|
|
- Liste des imports précédents
|
|
- Affichage du contenu d'un import : sélecteur de semaine + tableau des serveurs
|
|
- Endpoints JSON pour AJAX (sélection de semaine sans rechargement)
|
|
|
|
Le module pré-patching et le patching by-step seront branchés en étape 2/3.
|
|
"""
|
|
import io
|
|
import json
|
|
import re
|
|
from datetime import date, datetime, time
|
|
from fastapi import APIRouter, Request, Depends, UploadFile, File, Form
|
|
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from sqlalchemy import text
|
|
|
|
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, base_context
|
|
from ..config import APP_NAME
|
|
|
|
router = APIRouter()
|
|
templates = Jinja2Templates(directory="app/templates")
|
|
|
|
# Colonnes attendues dans les feuilles Sxx (ordre = priorité, on matche par regex/lower)
|
|
# Le fichier 2026 a 12 variantes d'en-têtes selon la semaine
|
|
# (ancien format S02-S06, nouveau format DTS S07+)
|
|
KNOWN_COLUMNS = {
|
|
"asset_name": [r"asset\s*name", r"\bnom\b"],
|
|
"intervenant": [r"intervenant"],
|
|
"environnement": [r"environnement|environement"],
|
|
"domaine": [r"^domaine"],
|
|
"os": [r"^\s*os\s*$"],
|
|
"os_version": [r"version\s*os"], # matche "Version OS" et "Version OS->Nom"
|
|
"application_name": [r"logiciel", r"application", r"^nom\s*complet$"],
|
|
"valideur_ra": [r"valideur"],
|
|
"responsable_domaine_dts":[r"responsable\s*domaine"],
|
|
"description": [r"description"],
|
|
"assistant": [r"^assistant"],
|
|
"referent_technique": [r"r.f.rent\s*tech"],
|
|
"mode_operatoire": [r"mode\s*op.ratoire"],
|
|
"impacts": [r"^impact"],
|
|
"commentaire": [r"commentaire"],
|
|
"base_de_donnees": [r"base\s*de\s*donn"],
|
|
"duree_coupure": [r"dur.+coupure"],
|
|
"jour": [r"^\s*jour\s*$", r"^\s*date\s*$", r"date\s*pr.+vis"],
|
|
"heure": [r"^\s*heure"],
|
|
"pb_espace_disque": [r"espace\s*disque"],
|
|
"date_patch_realise": [r"date\s*du?\s*patch.+r.+alis"],
|
|
}
|
|
|
|
SHEET_WEEK_RE = re.compile(r"^S\s*0?(\d+)$", re.IGNORECASE)
|
|
|
|
|
|
def _can_import(perms):
|
|
"""Droit d'importer = niveau edit/admin sur planning ou campaigns."""
|
|
return can_edit(perms, "planning") or can_edit(perms, "campaigns")
|
|
|
|
|
|
def _to_iso(v):
|
|
if v is None:
|
|
return None
|
|
if isinstance(v, (datetime, date)):
|
|
return v.isoformat()
|
|
return str(v)
|
|
|
|
|
|
def _coerce_date(v):
|
|
if v is None or v == "":
|
|
return None
|
|
if isinstance(v, datetime):
|
|
return v.date()
|
|
if isinstance(v, date):
|
|
return v
|
|
return None
|
|
|
|
|
|
def _coerce_date_or_text(v):
|
|
"""Renvoie (date|None, fallback_text|None).
|
|
Si v est un datetime/date → (date, None).
|
|
Si v est une string parseable dd/mm/yyyy → (date, None).
|
|
Si v est une string non parseable (ex: "A partir du 14/01") → (None, str).
|
|
"""
|
|
if v is None or v == "":
|
|
return None, None
|
|
if isinstance(v, datetime):
|
|
return v.date(), None
|
|
if isinstance(v, date):
|
|
return v, None
|
|
if isinstance(v, str):
|
|
s = v.strip()
|
|
if not s:
|
|
return None, None
|
|
m = re.match(r"^(\d{1,2})/(\d{1,2})/(\d{2,4})$", s)
|
|
if m:
|
|
try:
|
|
d = int(m.group(1)); mo = int(m.group(2)); y = int(m.group(3))
|
|
if y < 100:
|
|
y += 2000
|
|
return date(y, mo, d), None
|
|
except ValueError:
|
|
pass
|
|
return None, s
|
|
return None, str(v)
|
|
|
|
|
|
def _coerce_time(v):
|
|
"""Renvoie un datetime.time ou None.
|
|
Accepte: time, datetime, str '9H00', '12h30', '9:00', '14:00:00'.
|
|
"""
|
|
if v is None or v == "":
|
|
return None
|
|
if isinstance(v, time):
|
|
return v
|
|
if isinstance(v, datetime):
|
|
return v.time()
|
|
if isinstance(v, str):
|
|
s = v.strip()
|
|
m = re.match(r"^(\d{1,2})[Hh:](\d{2})(?::(\d{2}))?$", s)
|
|
if m:
|
|
try:
|
|
hh = int(m.group(1)); mm = int(m.group(2)); ss = int(m.group(3) or 0)
|
|
return time(hh, mm, ss)
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
|
|
def _format_heure(v):
|
|
"""Renvoie une chaîne 'HHhMM' style SANEF pour affichage.
|
|
Si v est string, on garde tel quel (gère les cas '9H00 et 14H00').
|
|
Si v est time/datetime, on synthétise '9H00'.
|
|
"""
|
|
if v is None or v == "":
|
|
return None
|
|
if isinstance(v, str):
|
|
return v.strip()
|
|
if isinstance(v, time):
|
|
return f"{v.hour}H{v.minute:02d}"
|
|
if isinstance(v, datetime):
|
|
return f"{v.hour}H{v.minute:02d}"
|
|
return str(v)
|
|
|
|
|
|
def _coerce_bool(v):
|
|
if v is None or v == "":
|
|
return None
|
|
if isinstance(v, bool):
|
|
return v
|
|
s = str(v).strip().lower()
|
|
if s in ("true", "vrai", "oui", "yes", "1", "x"):
|
|
return True
|
|
if s in ("false", "faux", "non", "no", "0"):
|
|
return False
|
|
return None
|
|
|
|
|
|
def _norm_header(h):
|
|
if h is None:
|
|
return ""
|
|
return re.sub(r"\s+", " ", str(h)).strip().lower()
|
|
|
|
|
|
def _build_column_map(headers):
|
|
"""Mappe l'index de colonne → nom logique (asset_name, intervenant, ...)."""
|
|
col_map = {}
|
|
used_logical = set()
|
|
for idx, h in enumerate(headers):
|
|
norm = _norm_header(h)
|
|
if not norm:
|
|
continue
|
|
for logical, patterns in KNOWN_COLUMNS.items():
|
|
if logical in used_logical:
|
|
continue
|
|
for pat in patterns:
|
|
if re.search(pat, norm):
|
|
col_map[idx] = logical
|
|
used_logical.add(logical)
|
|
break
|
|
if logical in used_logical:
|
|
break
|
|
return col_map
|
|
|
|
|
|
def _parse_sheet(ws, sheet_name):
|
|
"""Parse une feuille xlsx → liste de dict {logical_col: value, _raw: {header: value}}."""
|
|
rows_iter = ws.iter_rows(values_only=True)
|
|
try:
|
|
headers = next(rows_iter)
|
|
except StopIteration:
|
|
return [], []
|
|
headers = [h for h in headers]
|
|
col_map = _build_column_map(headers)
|
|
|
|
parsed = []
|
|
for ridx, row in enumerate(rows_iter, start=1):
|
|
if row is None:
|
|
continue
|
|
if all(c is None or (isinstance(c, str) and not c.strip()) for c in row):
|
|
continue
|
|
rec = {"row_index": ridx}
|
|
raw = {}
|
|
for cidx, val in enumerate(row):
|
|
header = headers[cidx] if cidx < len(headers) else f"col_{cidx}"
|
|
header_str = _norm_header(header) or f"col_{cidx}"
|
|
raw[header_str] = _to_iso(val)
|
|
if cidx in col_map:
|
|
rec[col_map[cidx]] = val
|
|
rec["_raw"] = raw
|
|
parsed.append(rec)
|
|
return headers, parsed
|
|
|
|
|
|
def _list_imports(db):
|
|
return db.execute(text("""
|
|
SELECT i.id, i.filename, i.year, i.sheet_count, i.row_count,
|
|
i.uploaded_at, u.username as uploaded_by_name
|
|
FROM patch_planning_imports i
|
|
LEFT JOIN users u ON u.id = i.uploaded_by
|
|
ORDER BY i.uploaded_at DESC
|
|
LIMIT 50
|
|
""")).fetchall()
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
# Pages
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
@router.get("/patching/import", response_class=HTMLResponse)
|
|
async def import_index(request: Request, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not (can_view(perms, "planning") or can_view(perms, "campaigns")):
|
|
return RedirectResponse(url="/dashboard")
|
|
|
|
imports = _list_imports(db)
|
|
ctx = base_context(request, db, user)
|
|
ctx.update({
|
|
"app_name": APP_NAME,
|
|
"imports": imports,
|
|
"current_import": None,
|
|
"can_import": _can_import(perms),
|
|
"msg": request.query_params.get("msg"),
|
|
"err": request.query_params.get("err"),
|
|
})
|
|
return templates.TemplateResponse("patching_import.html", ctx)
|
|
|
|
|
|
@router.get("/patching/import/{import_id}", response_class=HTMLResponse)
|
|
async def import_view(request: Request, import_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not (can_view(perms, "planning") or can_view(perms, "campaigns")):
|
|
return RedirectResponse(url="/dashboard")
|
|
|
|
imp = db.execute(text("""
|
|
SELECT i.*, u.username as uploaded_by_name
|
|
FROM patch_planning_imports i
|
|
LEFT JOIN users u ON u.id = i.uploaded_by
|
|
WHERE i.id = :id
|
|
"""), {"id": import_id}).fetchone()
|
|
if not imp:
|
|
return RedirectResponse(url="/patching/import?err=notfound")
|
|
|
|
sheets = db.execute(text("""
|
|
SELECT sheet_name, week_number, COUNT(*) as nb
|
|
FROM patch_planning_import_rows
|
|
WHERE import_id = :id
|
|
GROUP BY sheet_name, week_number
|
|
ORDER BY week_number NULLS LAST, sheet_name
|
|
"""), {"id": import_id}).fetchall()
|
|
|
|
imports = _list_imports(db)
|
|
ctx = base_context(request, db, user)
|
|
ctx.update({
|
|
"app_name": APP_NAME,
|
|
"imports": imports,
|
|
"current_import": imp,
|
|
"sheets": sheets,
|
|
"can_import": _can_import(perms),
|
|
"msg": request.query_params.get("msg"),
|
|
"err": request.query_params.get("err"),
|
|
})
|
|
return templates.TemplateResponse("patching_import.html", ctx)
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
# JSON : rows d'une feuille
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
@router.get("/patching/import/{import_id}/sheet/{sheet_name}")
|
|
async def import_sheet_json(request: Request, import_id: int, sheet_name: str,
|
|
db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
|
|
|
|
rows = db.execute(text("""
|
|
SELECT r.id, r.row_index, r.asset_name, r.intervenant, r.environnement,
|
|
r.domaine, r.os, r.os_version, r.application_name,
|
|
r.valideur_ra, r.responsable_domaine_dts,
|
|
r.description, r.assistant, r.referent_technique,
|
|
r.mode_operatoire, r.impacts, r.commentaire, r.base_de_donnees,
|
|
r.duree_coupure, r.jour, r.jour_text, r.heure, r.heure_t,
|
|
r.pb_espace_disque, r.date_patch_realise,
|
|
r.server_id, s.hostname as resolved_hostname,
|
|
(r.jour + COALESCE(r.heure_t, TIME '00:00:00')) AS start_at
|
|
FROM patch_planning_import_rows r
|
|
LEFT JOIN servers s ON s.id = r.server_id
|
|
WHERE r.import_id = :id AND r.sheet_name = :sn
|
|
ORDER BY r.row_index
|
|
"""), {"id": import_id, "sn": sheet_name}).fetchall()
|
|
|
|
out = []
|
|
for r in rows:
|
|
out.append({
|
|
"id": r.id,
|
|
"row_index": r.row_index,
|
|
"asset_name": r.asset_name,
|
|
"intervenant": r.intervenant,
|
|
"environnement": r.environnement,
|
|
"domaine": r.domaine,
|
|
"os": r.os,
|
|
"os_version": r.os_version,
|
|
"application_name": r.application_name,
|
|
"valideur_ra": r.valideur_ra,
|
|
"responsable_domaine_dts": r.responsable_domaine_dts,
|
|
"description": r.description,
|
|
"assistant": r.assistant,
|
|
"referent_technique": r.referent_technique,
|
|
"mode_operatoire": r.mode_operatoire,
|
|
"impacts": r.impacts,
|
|
"commentaire": r.commentaire,
|
|
"base_de_donnees": r.base_de_donnees,
|
|
"duree_coupure": r.duree_coupure,
|
|
"jour": r.jour.isoformat() if r.jour else None,
|
|
"jour_text": r.jour_text,
|
|
"heure": r.heure,
|
|
"heure_t": r.heure_t.strftime("%H:%M:%S") if r.heure_t else None,
|
|
"start_iso": r.start_at.isoformat() if r.start_at else None,
|
|
"pb_espace_disque": r.pb_espace_disque,
|
|
"date_patch_realise": r.date_patch_realise.isoformat() if r.date_patch_realise else None,
|
|
"server_id": r.server_id,
|
|
"resolved_hostname": r.resolved_hostname,
|
|
})
|
|
return JSONResponse({"ok": True, "rows": out, "count": len(out)})
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
# Upload
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
@router.post("/patching/import/upload")
|
|
async def import_upload(request: Request, db=Depends(get_db),
|
|
file: UploadFile = File(...),
|
|
note: str = Form("")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=303)
|
|
perms = get_user_perms(db, user)
|
|
if not _can_import(perms):
|
|
return RedirectResponse(url="/patching/import?err=denied", status_code=303)
|
|
|
|
fname = file.filename or "import.xlsx"
|
|
if not fname.lower().endswith(".xlsx"):
|
|
return RedirectResponse(url="/patching/import?err=ext", status_code=303)
|
|
|
|
try:
|
|
import openpyxl
|
|
except ImportError:
|
|
return RedirectResponse(url="/patching/import?err=openpyxl_missing", status_code=303)
|
|
|
|
content = await file.read()
|
|
try:
|
|
wb = openpyxl.load_workbook(io.BytesIO(content), read_only=True, data_only=True)
|
|
except Exception as e:
|
|
print(f"[import_upload] load_workbook failed: {e}")
|
|
return RedirectResponse(url="/patching/import?err=parse", status_code=303)
|
|
|
|
# Détecter l'année (depuis nom de fichier ou colonne 'jour' de la 1ère feuille semaine)
|
|
year_match = re.search(r"(20\d{2})", fname)
|
|
year = int(year_match.group(1)) if year_match else None
|
|
|
|
# Insert header
|
|
db.execute(text("""
|
|
INSERT INTO patch_planning_imports (filename, year, sheet_count, row_count, uploaded_by, note)
|
|
VALUES (:fn, :y, 0, 0, :uid, :nt)
|
|
"""), {"fn": fname, "y": year, "uid": user.get("uid"), "nt": note or None})
|
|
db.commit()
|
|
import_id = db.execute(text("SELECT lastval()")).scalar()
|
|
|
|
sheet_count = 0
|
|
row_count = 0
|
|
|
|
# Pré-charge mapping hostname → server_id pour résolution
|
|
hostname_map = {}
|
|
for r in db.execute(text("SELECT id, hostname FROM servers")).fetchall():
|
|
if r.hostname:
|
|
hostname_map[r.hostname.lower().strip()] = r.id
|
|
|
|
for sheet_name in wb.sheetnames:
|
|
m = SHEET_WEEK_RE.match(sheet_name.strip())
|
|
if not m:
|
|
# On ignore les feuilles "Histo-XXX" et autres non-semaines
|
|
continue
|
|
week_num = int(m.group(1))
|
|
ws = wb[sheet_name]
|
|
_, parsed_rows = _parse_sheet(ws, sheet_name)
|
|
if not parsed_rows:
|
|
continue
|
|
sheet_count += 1
|
|
for rec in parsed_rows:
|
|
asset = rec.get("asset_name")
|
|
asset_str = str(asset).strip() if asset else None
|
|
if not asset_str:
|
|
continue
|
|
sid = hostname_map.get(asset_str.lower())
|
|
jour_d, jour_t = _coerce_date_or_text(rec.get("jour"))
|
|
heure_t = _coerce_time(rec.get("heure"))
|
|
heure_disp = _format_heure(rec.get("heure"))
|
|
db.execute(text("""
|
|
INSERT INTO patch_planning_import_rows (
|
|
import_id, sheet_name, week_number, row_index,
|
|
asset_name, intervenant, environnement, domaine, os, os_version,
|
|
application_name, valideur_ra, responsable_domaine_dts,
|
|
description, assistant, referent_technique, mode_operatoire, impacts,
|
|
commentaire, base_de_donnees,
|
|
duree_coupure, jour, jour_text, heure, heure_t,
|
|
pb_espace_disque, date_patch_realise,
|
|
raw_data, server_id
|
|
) VALUES (
|
|
:imp, :sn, :wn, :ri,
|
|
:an, :it, :en, :do, :os, :ov,
|
|
:ap, :vr, :rd,
|
|
:de, :as_, :rt, :mo, :im,
|
|
:co, :bdd,
|
|
:dc, :jr, :jt, :hr, :ht,
|
|
:pb, :dpr,
|
|
:raw, :sid
|
|
)
|
|
"""), {
|
|
"imp": import_id, "sn": sheet_name, "wn": week_num, "ri": rec["row_index"],
|
|
"an": asset_str,
|
|
"it": str(rec.get("intervenant")) if rec.get("intervenant") else None,
|
|
"en": str(rec.get("environnement")) if rec.get("environnement") else None,
|
|
"do": str(rec.get("domaine")) if rec.get("domaine") else None,
|
|
"os": str(rec.get("os")) if rec.get("os") else None,
|
|
"ov": str(rec.get("os_version")) if rec.get("os_version") else None,
|
|
"ap": str(rec.get("application_name")) if rec.get("application_name") else None,
|
|
"vr": str(rec.get("valideur_ra")) if rec.get("valideur_ra") else None,
|
|
"rd": str(rec.get("responsable_domaine_dts")) if rec.get("responsable_domaine_dts") else None,
|
|
"de": str(rec.get("description")) if rec.get("description") else None,
|
|
"as_": str(rec.get("assistant")) if rec.get("assistant") else None,
|
|
"rt": str(rec.get("referent_technique")) if rec.get("referent_technique") else None,
|
|
"mo": str(rec.get("mode_operatoire")) if rec.get("mode_operatoire") else None,
|
|
"im": str(rec.get("impacts")) if rec.get("impacts") else None,
|
|
"co": str(rec.get("commentaire")) if rec.get("commentaire") else None,
|
|
"bdd": str(rec.get("base_de_donnees")) if rec.get("base_de_donnees") else None,
|
|
"dc": str(rec.get("duree_coupure")) if rec.get("duree_coupure") else None,
|
|
"jr": jour_d,
|
|
"jt": jour_t,
|
|
"hr": heure_disp,
|
|
"ht": heure_t,
|
|
"pb": _coerce_bool(rec.get("pb_espace_disque")),
|
|
"dpr": _coerce_date(rec.get("date_patch_realise")),
|
|
"raw": json.dumps(rec.get("_raw") or {}, ensure_ascii=False, default=str),
|
|
"sid": sid,
|
|
})
|
|
row_count += 1
|
|
db.execute(text("""
|
|
UPDATE patch_planning_imports SET sheet_count=:s, row_count=:r WHERE id=:id
|
|
"""), {"s": sheet_count, "r": row_count, "id": import_id})
|
|
db.commit()
|
|
|
|
return RedirectResponse(url=f"/patching/import/{import_id}?msg=ok", status_code=303)
|
|
|
|
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
# Suppression
|
|
# ────────────────────────────────────────────────────────────────────────
|
|
|
|
@router.post("/patching/import/{import_id}/delete")
|
|
async def import_delete(request: Request, import_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=303)
|
|
perms = get_user_perms(db, user)
|
|
if not _can_import(perms):
|
|
return RedirectResponse(url="/patching/import?err=denied", status_code=303)
|
|
db.execute(text("DELETE FROM patch_planning_imports WHERE id=:id"), {"id": import_id})
|
|
db.commit()
|
|
return RedirectResponse(url="/patching/import?msg=deleted", status_code=303)
|