patchcenter/app/routers/planning_import.py

752 lines
32 KiB
Python

"""Router import du planning de patching depuis Excel.
Fonctionnalités :
- Upload xlsx (multi-feuilles, 1 feuille = 1 semaine S02..S52)
- Liste des imports précédents
- Affichage du contenu d'un import : sélecteur de semaine + tableau des serveurs
- Endpoints JSON pour AJAX (sélection de semaine sans rechargement)
Le module pré-patching et le patching by-step seront branchés en étape 2/3.
"""
import io
import json
import re
from datetime import date, datetime, time
from fastapi import APIRouter, Request, Depends, UploadFile, File, Form, Query
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, base_context
from ..config import APP_NAME
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
# Colonnes attendues dans les feuilles Sxx (ordre = priorité, on matche par regex/lower)
# Le fichier 2026 a 12 variantes d'en-têtes selon la semaine
# (ancien format S02-S06, nouveau format DTS S07+)
KNOWN_COLUMNS = {
"asset_name": [r"asset\s*name", r"\bnom\b"],
"intervenant": [r"intervenant"],
"environnement": [r"environnement|environement"],
"domaine": [r"^domaine"],
"os": [r"^\s*os\s*$"],
"os_version": [r"version\s*os"], # matche "Version OS" et "Version OS->Nom"
"application_name": [r"logiciel", r"application", r"^nom\s*complet$"],
"valideur_ra": [r"valideur"],
"responsable_domaine_dts":[r"responsable\s*domaine"],
"description": [r"description"],
"assistant": [r"^assistant"],
"referent_technique": [r"r.f.rent\s*tech"],
"mode_operatoire": [r"mode\s*op.ratoire"],
"impacts": [r"^impact"],
"commentaire": [r"commentaire"],
"base_de_donnees": [r"base\s*de\s*donn"],
"duree_coupure": [r"dur.+coupure"],
"jour": [r"^\s*jour\s*$", r"^\s*date\s*$", r"date\s*pr.+vis"],
"heure": [r"^\s*heure"],
"pb_espace_disque": [r"espace\s*disque"],
"date_patch_realise": [r"date\s*du?\s*patch.+r.+alis"],
}
SHEET_WEEK_RE = re.compile(r"^S\s*0?(\d+)$", re.IGNORECASE)
def _can_import(perms):
"""Droit d'importer = niveau edit/admin sur planning ou campaigns."""
return can_edit(perms, "planning") or can_edit(perms, "campaigns")
def _to_iso(v):
if v is None:
return None
if isinstance(v, (datetime, date)):
return v.isoformat()
return str(v)
def _coerce_date(v):
if v is None or v == "":
return None
if isinstance(v, datetime):
return v.date()
if isinstance(v, date):
return v
return None
def _coerce_date_or_text(v):
"""Renvoie (date|None, fallback_text|None).
Si v est un datetime/date → (date, None).
Si v est une string parseable dd/mm/yyyy → (date, None).
Si v est une string non parseable (ex: "A partir du 14/01") → (None, str).
"""
if v is None or v == "":
return None, None
if isinstance(v, datetime):
return v.date(), None
if isinstance(v, date):
return v, None
if isinstance(v, str):
s = v.strip()
if not s:
return None, None
m = re.match(r"^(\d{1,2})/(\d{1,2})/(\d{2,4})$", s)
if m:
try:
d = int(m.group(1)); mo = int(m.group(2)); y = int(m.group(3))
if y < 100:
y += 2000
return date(y, mo, d), None
except ValueError:
pass
return None, s
return None, str(v)
def _coerce_time(v):
"""Renvoie un datetime.time ou None.
Accepte: time, datetime, str '9H00', '12h30', '9:00', '14:00:00'.
"""
if v is None or v == "":
return None
if isinstance(v, time):
return v
if isinstance(v, datetime):
return v.time()
if isinstance(v, str):
s = v.strip()
m = re.match(r"^(\d{1,2})[Hh:](\d{2})(?::(\d{2}))?$", s)
if m:
try:
hh = int(m.group(1)); mm = int(m.group(2)); ss = int(m.group(3) or 0)
return time(hh, mm, ss)
except ValueError:
return None
return None
def _format_heure(v):
"""Renvoie une chaîne 'HHhMM' style SANEF pour affichage.
Si v est string, on garde tel quel (gère les cas '9H00 et 14H00').
Si v est time/datetime, on synthétise '9H00'.
"""
if v is None or v == "":
return None
if isinstance(v, str):
return v.strip()
if isinstance(v, time):
return f"{v.hour}H{v.minute:02d}"
if isinstance(v, datetime):
return f"{v.hour}H{v.minute:02d}"
return str(v)
def _coerce_bool(v):
if v is None or v == "":
return None
if isinstance(v, bool):
return v
s = str(v).strip().lower()
if s in ("true", "vrai", "oui", "yes", "1", "x"):
return True
if s in ("false", "faux", "non", "no", "0"):
return False
return None
def _norm_header(h):
if h is None:
return ""
return re.sub(r"\s+", " ", str(h)).strip().lower()
def _build_column_map(headers):
"""Mappe l'index de colonne → nom logique (asset_name, intervenant, ...)."""
col_map = {}
used_logical = set()
for idx, h in enumerate(headers):
norm = _norm_header(h)
if not norm:
continue
for logical, patterns in KNOWN_COLUMNS.items():
if logical in used_logical:
continue
for pat in patterns:
if re.search(pat, norm):
col_map[idx] = logical
used_logical.add(logical)
break
if logical in used_logical:
break
return col_map
def _parse_sheet(ws, sheet_name):
"""Parse une feuille xlsx → liste de dict {logical_col: value, _raw: {header: value}}."""
rows_iter = ws.iter_rows(values_only=True)
try:
headers = next(rows_iter)
except StopIteration:
return [], []
headers = [h for h in headers]
col_map = _build_column_map(headers)
parsed = []
for ridx, row in enumerate(rows_iter, start=1):
if row is None:
continue
if all(c is None or (isinstance(c, str) and not c.strip()) for c in row):
continue
rec = {"row_index": ridx}
raw = {}
for cidx, val in enumerate(row):
header = headers[cidx] if cidx < len(headers) else f"col_{cidx}"
header_str = _norm_header(header) or f"col_{cidx}"
raw[header_str] = _to_iso(val)
if cidx in col_map:
rec[col_map[cidx]] = val
rec["_raw"] = raw
parsed.append(rec)
return headers, parsed
def _list_imports(db):
return db.execute(text("""
SELECT i.id, i.filename, i.year, i.sheet_count, i.row_count,
i.uploaded_at, u.username as uploaded_by_name
FROM patch_planning_imports i
LEFT JOIN users u ON u.id = i.uploaded_by
ORDER BY i.uploaded_at DESC
LIMIT 50
""")).fetchall()
# ────────────────────────────────────────────────────────────────────────
# Pages
# ────────────────────────────────────────────────────────────────────────
@router.get("/patching/import", response_class=HTMLResponse)
async def import_index(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not (can_view(perms, "planning") or can_view(perms, "campaigns")):
return RedirectResponse(url="/dashboard")
imports = _list_imports(db)
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME,
"imports": imports,
"current_import": None,
"can_import": _can_import(perms),
"msg": request.query_params.get("msg"),
"err": request.query_params.get("err"),
})
return templates.TemplateResponse("patching_import.html", ctx)
@router.get("/patching/import/{import_id}", response_class=HTMLResponse)
async def import_view(request: Request, import_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not (can_view(perms, "planning") or can_view(perms, "campaigns")):
return RedirectResponse(url="/dashboard")
imp = db.execute(text("""
SELECT i.*, u.username as uploaded_by_name
FROM patch_planning_imports i
LEFT JOIN users u ON u.id = i.uploaded_by
WHERE i.id = :id
"""), {"id": import_id}).fetchone()
if not imp:
return RedirectResponse(url="/patching/import?err=notfound")
sheets = db.execute(text("""
SELECT sheet_name, week_number, COUNT(*) as nb
FROM patch_planning_import_rows
WHERE import_id = :id
GROUP BY sheet_name, week_number
ORDER BY week_number NULLS LAST, sheet_name
"""), {"id": import_id}).fetchall()
imports = _list_imports(db)
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME,
"imports": imports,
"current_import": imp,
"sheets": sheets,
"can_import": _can_import(perms),
"msg": request.query_params.get("msg"),
"err": request.query_params.get("err"),
})
return templates.TemplateResponse("patching_import.html", ctx)
# ────────────────────────────────────────────────────────────────────────
# JSON : rows d'une feuille
# ────────────────────────────────────────────────────────────────────────
@router.get("/patching/import/{import_id}/sheet/{sheet_name}")
async def import_sheet_json(request: Request, import_id: int, sheet_name: str,
db=Depends(get_db)):
user = get_current_user(request)
if not user:
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
rows = db.execute(text("""
SELECT r.id, r.row_index, r.asset_name, r.intervenant, r.environnement,
r.domaine, r.os, r.os_version, r.application_name,
r.valideur_ra, r.responsable_domaine_dts,
r.description, r.assistant, r.referent_technique,
r.mode_operatoire, r.impacts, r.commentaire, r.base_de_donnees,
r.duree_coupure, r.jour, r.jour_text, r.heure, r.heure_t,
r.pb_espace_disque, r.date_patch_realise,
r.is_eligible, r.reported_to_sheet, r.report_reason,
r.server_id, s.hostname as resolved_hostname,
(r.jour + COALESCE(r.heure_t, TIME '00:00:00')) AS start_at
FROM patch_planning_import_rows r
LEFT JOIN servers s ON s.id = r.server_id
WHERE r.import_id = :id AND r.sheet_name = :sn
ORDER BY r.row_index
"""), {"id": import_id, "sn": sheet_name}).fetchall()
out = []
for r in rows:
out.append({
"id": r.id,
"row_index": r.row_index,
"asset_name": r.asset_name,
"intervenant": r.intervenant,
"environnement": r.environnement,
"domaine": r.domaine,
"os": r.os,
"os_version": r.os_version,
"application_name": r.application_name,
"valideur_ra": r.valideur_ra,
"responsable_domaine_dts": r.responsable_domaine_dts,
"description": r.description,
"assistant": r.assistant,
"referent_technique": r.referent_technique,
"mode_operatoire": r.mode_operatoire,
"impacts": r.impacts,
"commentaire": r.commentaire,
"base_de_donnees": r.base_de_donnees,
"duree_coupure": r.duree_coupure,
"jour": r.jour.isoformat() if r.jour else None,
"jour_text": r.jour_text,
"heure": r.heure,
"heure_t": r.heure_t.strftime("%H:%M:%S") if r.heure_t else None,
"start_iso": r.start_at.isoformat() if r.start_at else None,
"pb_espace_disque": r.pb_espace_disque,
"date_patch_realise": r.date_patch_realise.isoformat() if r.date_patch_realise else None,
"is_eligible": r.is_eligible,
"reported_to_sheet": r.reported_to_sheet,
"report_reason": r.report_reason,
"server_id": r.server_id,
"resolved_hostname": r.resolved_hostname,
})
return JSONResponse({"ok": True, "rows": out, "count": len(out)})
# ────────────────────────────────────────────────────────────────────────
# Action sur les rows : éligible / report
# ────────────────────────────────────────────────────────────────────────
@router.post("/patching/import/{import_id}/rows/action")
async def import_rows_action(request: Request, import_id: int, db=Depends(get_db)):
"""Pose une action sur N rows :
- action='eligible' : marque is_eligible=true (et clear report)
- action='unset_eligible' : remet is_eligible=false
- action='report' : reported_to_sheet=target_sheet, reason=... (clear is_eligible)
- action='unset_report' : clear report
"""
user = get_current_user(request)
if not user:
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
perms = get_user_perms(db, user)
if not _can_import(perms):
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
body = await request.json()
row_ids = [int(x) for x in body.get("row_ids", []) if str(x).isdigit()]
action = (body.get("action") or "").strip()
target_sheet = (body.get("target_sheet") or "").strip()
reason = (body.get("reason") or "").strip()
if not row_ids:
return JSONResponse({"ok": False, "msg": "Aucune ligne sélectionnée"})
if action not in ("eligible", "unset_eligible", "report", "unset_report"):
return JSONResponse({"ok": False, "msg": f"Action inconnue: {action}"})
if action == "report" and not target_sheet:
return JSONResponse({"ok": False, "msg": "Semaine cible obligatoire pour reporter"})
placeholders = ",".join(str(i) for i in row_ids)
# Restreint aux rows de cet import (sécurité)
rows = db.execute(text(f"""
SELECT id FROM patch_planning_import_rows
WHERE id IN ({placeholders}) AND import_id=:imp
"""), {"imp": import_id}).fetchall()
valid_ids = [r.id for r in rows]
if not valid_ids:
return JSONResponse({"ok": False, "msg": "Aucune ligne valide pour cet import"})
valid_ph = ",".join(str(i) for i in valid_ids)
uid = user.get("uid")
details = {}
if action == "eligible":
db.execute(text(f"""
UPDATE patch_planning_import_rows
SET is_eligible=true, reported_to_sheet=NULL, report_reason=NULL,
last_action_at=NOW(), last_action_by=:uid
WHERE id IN ({valid_ph})
"""), {"uid": uid})
elif action == "unset_eligible":
db.execute(text(f"""
UPDATE patch_planning_import_rows
SET is_eligible=false, last_action_at=NOW(), last_action_by=:uid
WHERE id IN ({valid_ph})
"""), {"uid": uid})
elif action == "report":
details = {"target_sheet": target_sheet, "reason": reason}
db.execute(text(f"""
UPDATE patch_planning_import_rows
SET is_eligible=false, reported_to_sheet=:ts, report_reason=:rs,
last_action_at=NOW(), last_action_by=:uid
WHERE id IN ({valid_ph})
"""), {"ts": target_sheet, "rs": reason or None, "uid": uid})
elif action == "unset_report":
db.execute(text(f"""
UPDATE patch_planning_import_rows
SET reported_to_sheet=NULL, report_reason=NULL,
last_action_at=NOW(), last_action_by=:uid
WHERE id IN ({valid_ph})
"""), {"uid": uid})
# Log
for rid in valid_ids:
db.execute(text("""
INSERT INTO patch_planning_row_log (row_id, action, details, performed_by)
VALUES (:rid, :ac, :de, :uid)
"""), {"rid": rid, "ac": action,
"de": json.dumps(details, ensure_ascii=False) if details else None,
"uid": uid})
db.commit()
return JSONResponse({"ok": True, "updated": len(valid_ids), "action": action})
# ────────────────────────────────────────────────────────────────────────
# Upload
# ────────────────────────────────────────────────────────────────────────
@router.post("/patching/import/upload")
async def import_upload(request: Request, db=Depends(get_db),
file: UploadFile = File(...),
note: str = Form("")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login", status_code=303)
perms = get_user_perms(db, user)
if not _can_import(perms):
return RedirectResponse(url="/patching/import?err=denied", status_code=303)
fname = file.filename or "import.xlsx"
if not fname.lower().endswith(".xlsx"):
return RedirectResponse(url="/patching/import?err=ext", status_code=303)
try:
import openpyxl
except ImportError:
return RedirectResponse(url="/patching/import?err=openpyxl_missing", status_code=303)
content = await file.read()
try:
wb = openpyxl.load_workbook(io.BytesIO(content), read_only=True, data_only=True)
except Exception as e:
print(f"[import_upload] load_workbook failed: {e}")
return RedirectResponse(url="/patching/import?err=parse", status_code=303)
# Détecter l'année (depuis nom de fichier ou colonne 'jour' de la 1ère feuille semaine)
year_match = re.search(r"(20\d{2})", fname)
year = int(year_match.group(1)) if year_match else None
# Insert header
db.execute(text("""
INSERT INTO patch_planning_imports (filename, year, sheet_count, row_count, uploaded_by, note)
VALUES (:fn, :y, 0, 0, :uid, :nt)
"""), {"fn": fname, "y": year, "uid": user.get("uid"), "nt": note or None})
db.commit()
import_id = db.execute(text("SELECT lastval()")).scalar()
sheet_count = 0
row_count = 0
# Pré-charge mapping hostname → server_id pour résolution
hostname_map = {}
for r in db.execute(text("SELECT id, hostname FROM servers")).fetchall():
if r.hostname:
hostname_map[r.hostname.lower().strip()] = r.id
for sheet_name in wb.sheetnames:
m = SHEET_WEEK_RE.match(sheet_name.strip())
if not m:
# On ignore les feuilles "Histo-XXX" et autres non-semaines
continue
week_num = int(m.group(1))
ws = wb[sheet_name]
_, parsed_rows = _parse_sheet(ws, sheet_name)
if not parsed_rows:
continue
sheet_count += 1
for rec in parsed_rows:
asset = rec.get("asset_name")
asset_str = str(asset).strip() if asset else None
if not asset_str:
continue
sid = hostname_map.get(asset_str.lower())
jour_d, jour_t = _coerce_date_or_text(rec.get("jour"))
heure_t = _coerce_time(rec.get("heure"))
heure_disp = _format_heure(rec.get("heure"))
db.execute(text("""
INSERT INTO patch_planning_import_rows (
import_id, sheet_name, week_number, row_index,
asset_name, intervenant, environnement, domaine, os, os_version,
application_name, valideur_ra, responsable_domaine_dts,
description, assistant, referent_technique, mode_operatoire, impacts,
commentaire, base_de_donnees,
duree_coupure, jour, jour_text, heure, heure_t,
pb_espace_disque, date_patch_realise,
raw_data, server_id
) VALUES (
:imp, :sn, :wn, :ri,
:an, :it, :en, :do, :os, :ov,
:ap, :vr, :rd,
:de, :as_, :rt, :mo, :im,
:co, :bdd,
:dc, :jr, :jt, :hr, :ht,
:pb, :dpr,
:raw, :sid
)
"""), {
"imp": import_id, "sn": sheet_name, "wn": week_num, "ri": rec["row_index"],
"an": asset_str,
"it": str(rec.get("intervenant")) if rec.get("intervenant") else None,
"en": str(rec.get("environnement")) if rec.get("environnement") else None,
"do": str(rec.get("domaine")) if rec.get("domaine") else None,
"os": str(rec.get("os")) if rec.get("os") else None,
"ov": str(rec.get("os_version")) if rec.get("os_version") else None,
"ap": str(rec.get("application_name")) if rec.get("application_name") else None,
"vr": str(rec.get("valideur_ra")) if rec.get("valideur_ra") else None,
"rd": str(rec.get("responsable_domaine_dts")) if rec.get("responsable_domaine_dts") else None,
"de": str(rec.get("description")) if rec.get("description") else None,
"as_": str(rec.get("assistant")) if rec.get("assistant") else None,
"rt": str(rec.get("referent_technique")) if rec.get("referent_technique") else None,
"mo": str(rec.get("mode_operatoire")) if rec.get("mode_operatoire") else None,
"im": str(rec.get("impacts")) if rec.get("impacts") else None,
"co": str(rec.get("commentaire")) if rec.get("commentaire") else None,
"bdd": str(rec.get("base_de_donnees")) if rec.get("base_de_donnees") else None,
"dc": str(rec.get("duree_coupure")) if rec.get("duree_coupure") else None,
"jr": jour_d,
"jt": jour_t,
"hr": heure_disp,
"ht": heure_t,
"pb": _coerce_bool(rec.get("pb_espace_disque")),
"dpr": _coerce_date(rec.get("date_patch_realise")),
"raw": json.dumps(rec.get("_raw") or {}, ensure_ascii=False, default=str),
"sid": sid,
})
row_count += 1
db.execute(text("""
UPDATE patch_planning_imports SET sheet_count=:s, row_count=:r WHERE id=:id
"""), {"s": sheet_count, "r": row_count, "id": import_id})
db.commit()
return RedirectResponse(url=f"/patching/import/{import_id}?msg=ok", status_code=303)
# ────────────────────────────────────────────────────────────────────────
# Suppression
# ────────────────────────────────────────────────────────────────────────
# ────────────────────────────────────────────────────────────────────────
# Workflow iexec — placeholder étape B (à compléter)
# ────────────────────────────────────────────────────────────────────────
@router.get("/patching/iexec", response_class=HTMLResponse)
async def iexec_page(request: Request, db=Depends(get_db),
row_ids: str = Query("")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not (can_view(perms, "planning") or can_view(perms, "campaigns")):
return RedirectResponse(url="/dashboard")
ids = [int(x) for x in row_ids.split(",") if x.strip().isdigit()]
rows = []
if ids:
placeholders = ",".join(str(i) for i in ids)
rows = db.execute(text(f"""
SELECT r.id, r.asset_name, r.environnement, r.domaine, r.os, r.os_version,
r.intervenant,
r.is_eligible, r.server_id,
s.hostname, vs.effective_excludes
FROM patch_planning_import_rows r
LEFT JOIN servers s ON s.id = r.server_id
LEFT JOIN v_servers vs ON vs.id = r.server_id
WHERE r.id IN ({placeholders}) AND r.is_eligible = true
""")).fetchall()
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME,
"rows": rows,
"row_ids": ids,
})
return templates.TemplateResponse("patching_iexec.html", ctx)
@router.post("/patching/iexec/check/{row_id}")
async def iexec_check(request: Request, row_id: int, db=Depends(get_db)):
"""Lance les 3 checks pré-patching (DNS, SSH, Satellite) sur 1 row éligible.
Retourne JSON avec le résultat détaillé."""
user = get_current_user(request)
if not user:
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
perms = get_user_perms(db, user)
if not (can_view(perms, "planning") or can_view(perms, "campaigns")):
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
row = db.execute(text("""
SELECT r.id, r.asset_name, r.intervenant, r.environnement, r.domaine,
r.os, r.os_version, r.is_eligible, r.server_id, s.hostname
FROM patch_planning_import_rows r
LEFT JOIN servers s ON s.id = r.server_id
WHERE r.id = :id
"""), {"id": row_id}).fetchone()
if not row:
return JSONResponse({"ok": False, "msg": "Ligne introuvable"}, status_code=404)
if not row.is_eligible:
return JSONResponse({"ok": False, "msg": "Ligne non éligible"}, status_code=400)
# Workflow yum/Satellite = Linux uniquement
os_str = str(row.os or "").lower()
if "windows" in os_str or os_str.strip() == "win":
return JSONResponse({
"ok": True, "row_id": row_id,
"hostname": row.asset_name, "target": None,
"overall": "unsupported",
"checks": [],
"skipped_reason": f"OS '{row.os}' non concerné — workflow Linux uniquement",
})
hostname = (row.hostname or row.asset_name or "").strip()
if not hostname:
return JSONResponse({"ok": False, "msg": "Pas de hostname"}, status_code=400)
from ..services.prepatch_check_service import run_all_checks
result = run_all_checks(hostname, row={
"asset_name": row.asset_name,
"intervenant": row.intervenant,
"environnement": row.environnement,
"domaine": row.domaine,
})
return JSONResponse({"ok": True, "row_id": row_id, **result})
@router.post("/patching/iexec/snapshot/{row_id}")
async def iexec_snapshot(request: Request, row_id: int, db=Depends(get_db)):
"""Step 2 — prend un snapshot vCenter pour 1 row éligible Linux.
Nom snapshot : <intervenant>_<YYYY-MM-DD>_avant_patch.
Réutilise quickwin_snapshot_service.snapshot_server."""
user = get_current_user(request)
if not user:
return JSONResponse({"ok": False, "detail": "Non authentifié"}, status_code=401)
perms = get_user_perms(db, user)
if not _can_import(perms):
return JSONResponse({"ok": False, "detail": "Permission refusée"}, status_code=403)
row = db.execute(text("""
SELECT r.id, r.asset_name, r.intervenant, r.environnement, r.os, r.is_eligible,
s.hostname, s.vcenter_vm_name
FROM patch_planning_import_rows r
LEFT JOIN servers s ON s.id = r.server_id
WHERE r.id = :id
"""), {"id": row_id}).fetchone()
if not row:
return JSONResponse({"ok": False, "detail": "Ligne introuvable"}, status_code=404)
if not row.is_eligible:
return JSONResponse({"ok": False, "detail": "Ligne non éligible"}, status_code=400)
os_str = str(row.os or "").lower()
if "windows" in os_str:
return JSONResponse({"ok": False, "detail": "OS Windows non géré"}, status_code=400)
hostname = (row.hostname or row.asset_name or "").strip()
if not hostname:
return JSONResponse({"ok": False, "detail": "Pas de hostname"}, status_code=400)
# Branche prod / hprod basée sur le préfixe hostname (convention SANEF) :
# vp/sp/lp → prod → vCenter "metier" (Nanterre, vpmetavcs1)
# le reste → hprod → vCenter "gestion" (Senlis, vpgesavcs1)
# Plus fiable que la colonne environnement Excel (peut être bug-prone).
prefix = (hostname.split(".")[0] or "").lower()[:2]
PROD_PREFIXES = ("vp", "sp", "lp")
branch = "prod" if prefix in PROD_PREFIXES else "hprod"
# Nom snapshot : <intervenant>_<YYYY-MM-DD>_avant_patch
intervenant = (row.intervenant or "patcheur").strip().replace(" ", "_")
today = datetime.now().strftime("%Y-%m-%d")
snap_name = f"{intervenant}_{today}_avant_patch"
# On cherche la VM dans vCenter par son hostname (pas par s.vcenter_vm_name
# qui peut être faux en base). Si plus tard on a un cas où la VM porte un
# nom différent, on rajoutera un mapping explicite.
vm_name = hostname
from ..services.quickwin_snapshot_service import snapshot_server
result = snapshot_server(hostname, vm_name, branch, db, snap_name=snap_name)
# result = {ok, vcenter, detail, skipped?}
# Audit log
try:
db.execute(text("""
INSERT INTO patch_planning_row_log (row_id, action, details, performed_by)
VALUES (:rid, 'snapshot', :de, :uid)
"""), {"rid": row_id,
"de": json.dumps({**result, "snap_name": snap_name, "branch": branch,
"vm_name": vm_name},
ensure_ascii=False),
"uid": user.get("uid")})
db.commit()
except Exception as e:
log_msg = f"audit log snapshot failed: {e}"
print(f"[iexec_snapshot] {log_msg}")
result.update({
"row_id": row_id,
"snap_name": snap_name,
"branch": branch,
"vm_name": vm_name,
})
return JSONResponse(result)
@router.post("/patching/import/{import_id}/delete")
async def import_delete(request: Request, import_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login", status_code=303)
perms = get_user_perms(db, user)
if not _can_import(perms):
return RedirectResponse(url="/patching/import?err=denied", status_code=303)
db.execute(text("DELETE FROM patch_planning_imports WHERE id=:id"), {"id": import_id})
db.commit()
return RedirectResponse(url="/patching/import?msg=deleted", status_code=303)