patchcenter/app/routers/safe_patching.py
Khalid MOUTAOUAKIL 49d5658475 Safe Patching wizard, SSE terminal, SSH password fallback, Qualys VMDR testé
Safe Patching Quick Win:
- Wizard 4 steps: Prérequis → Snapshot → Exécution → Post-patch
- Step 1: vérif SSH/disque/satellite par branche, exclure les KO
- Step 2: snapshot vSphere VMs
- Step 3: commande yum éditable, lancer hprod puis prod (100% requis)
- Step 4: vérification post-patch, export CSV
- Terminal SSE live (Server-Sent Events) avec couleurs
- Exclusion serveurs par checkbox dans chaque branche
- Label auto Quick Win SXX YYYY

SSH:
- Fallback password depuis settings si clé SSH absente
- Détection auto root (id -u) → pas de sudo si déjà root
- Testé sur VM doli CentOS 7 (10.0.2.4)

Qualys VMDR:
- API 2.0 testée et fonctionnelle avec compte sanef-ae
- Knowledge Base (CVE/QID/packages) accessible
- Host Detections (vulns par host) accessible
- Migration vers API 4.0 à prévoir (EOL dans 85 jours)

Qualys Agent installé sur doli (activation perso qg2)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 06:49:31 +02:00

230 lines
9.5 KiB
Python

"""Router Safe Patching — Quick Win campagnes + SSE terminal live"""
import asyncio, json
from datetime import datetime
from fastapi import APIRouter, Request, Depends, Query, Form
from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, base_context
from ..services.safe_patching_service import (
create_quickwin_campaign, get_quickwin_stats, build_yum_command, build_safe_excludes,
)
from ..services.campaign_service import get_campaign, get_campaign_sessions, get_campaign_stats
from ..services.patching_executor import get_stream, start_execution, emit
from ..config import APP_NAME, DATABASE_URL
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
@router.get("/safe-patching", response_class=HTMLResponse)
async def safe_patching_page(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "campaigns"):
return RedirectResponse(url="/dashboard")
# Campagnes quickwin existantes
campaigns = db.execute(text("""
SELECT c.*, u.display_name as created_by_name,
(SELECT COUNT(*) FROM patch_sessions ps WHERE ps.campaign_id = c.id) as session_count,
(SELECT COUNT(*) FROM patch_sessions ps WHERE ps.campaign_id = c.id AND ps.status = 'patched') as patched_count
FROM campaigns c LEFT JOIN users u ON c.created_by = u.id
WHERE c.campaign_type = 'quickwin'
ORDER BY c.year DESC, c.week_code DESC
""")).fetchall()
# Intervenants pour le formulaire
operators = db.execute(text(
"SELECT id, display_name FROM users WHERE is_active = true AND role = 'operator' ORDER BY display_name"
)).fetchall()
now = datetime.now()
current_week = now.isocalendar()[1]
current_year = now.isocalendar()[0]
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME, "campaigns": campaigns,
"operators": operators,
"current_week": current_week, "current_year": current_year,
"can_create": can_edit(perms, "campaigns"),
"msg": request.query_params.get("msg"),
})
return templates.TemplateResponse("safe_patching.html", ctx)
@router.post("/safe-patching/create")
async def safe_patching_create(request: Request, db=Depends(get_db),
label: str = Form(""), week_number: str = Form("0"),
year: str = Form("0"), lead_id: str = Form("0"),
assistant_id: str = Form("")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "campaigns"):
return RedirectResponse(url="/safe-patching")
wn = int(week_number) if week_number else 0
yr = int(year) if year else 0
lid = int(lead_id) if lead_id else 0
aid = int(assistant_id) if assistant_id.strip() else None
if not label:
label = f"Quick Win S{wn:02d} {yr}"
try:
cid = create_quickwin_campaign(db, yr, wn, label, lid, aid)
return RedirectResponse(url=f"/safe-patching/{cid}", status_code=303)
except Exception:
db.rollback()
return RedirectResponse(url="/safe-patching?msg=error", status_code=303)
@router.get("/safe-patching/{campaign_id}", response_class=HTMLResponse)
async def safe_patching_detail(request: Request, campaign_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
campaign = get_campaign(db, campaign_id)
if not campaign:
return RedirectResponse(url="/safe-patching")
sessions = get_campaign_sessions(db, campaign_id)
stats = get_campaign_stats(db, campaign_id)
qw_stats = get_quickwin_stats(db, campaign_id)
# Séparer hprod et prod
hprod = [s for s in sessions if s.environnement != 'Production' and s.status != 'excluded']
prod = [s for s in sessions if s.environnement == 'Production' and s.status != 'excluded']
excluded = [s for s in sessions if s.status == 'excluded']
# Commande safe patching
safe_cmd = build_yum_command()
safe_excludes = build_safe_excludes()
# Déterminer le step courant
if qw_stats.hprod_patched > 0 or qw_stats.prod_patched > 0:
current_step = "postcheck"
elif any(s.prereq_validated for s in sessions if s.status == 'pending'):
current_step = "execute"
else:
current_step = "prereqs"
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME, "c": campaign,
"sessions": sessions, "stats": stats, "qw_stats": qw_stats,
"hprod": hprod, "prod": prod, "excluded": excluded,
"safe_cmd": safe_cmd, "safe_excludes": safe_excludes,
"current_step": request.query_params.get("step", current_step),
"msg": request.query_params.get("msg"),
})
return templates.TemplateResponse("safe_patching_detail.html", ctx)
@router.post("/safe-patching/{campaign_id}/check-prereqs")
async def safe_patching_check_prereqs(request: Request, campaign_id: int, db=Depends(get_db),
branch: str = Form("hprod")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
from ..services.prereq_service import check_prereqs_campaign
checked, auto_excluded = check_prereqs_campaign(db, campaign_id)
return RedirectResponse(url=f"/safe-patching/{campaign_id}?step=prereqs&msg=prereqs_done", status_code=303)
@router.post("/safe-patching/{campaign_id}/bulk-exclude")
async def safe_patching_bulk_exclude(request: Request, campaign_id: int, db=Depends(get_db),
session_ids: str = Form("")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
from ..services.campaign_service import exclude_session
ids = [int(x) for x in session_ids.split(",") if x.strip().isdigit()]
for sid in ids:
exclude_session(db, sid, "autre", "Exclu du Quick Win", user.get("sub"))
return RedirectResponse(url=f"/safe-patching/{campaign_id}?msg=excluded_{len(ids)}", status_code=303)
@router.post("/safe-patching/{campaign_id}/execute")
async def safe_patching_execute(request: Request, campaign_id: int, db=Depends(get_db),
branch: str = Form("hprod")):
"""Lance l'exécution du safe patching pour une branche"""
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
# Récupérer les sessions pending de la branche
if branch == "hprod":
sessions = db.execute(text("""
SELECT ps.id FROM patch_sessions ps
JOIN servers s ON ps.server_id = s.id
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
LEFT JOIN environments e ON de.environment_id = e.id
WHERE ps.campaign_id = :cid AND ps.status = 'pending' AND e.name != 'Production'
ORDER BY s.hostname
"""), {"cid": campaign_id}).fetchall()
else:
sessions = db.execute(text("""
SELECT ps.id FROM patch_sessions ps
JOIN servers s ON ps.server_id = s.id
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
LEFT JOIN environments e ON de.environment_id = e.id
WHERE ps.campaign_id = :cid AND ps.status = 'pending' AND e.name = 'Production'
ORDER BY s.hostname
"""), {"cid": campaign_id}).fetchall()
session_ids = [s.id for s in sessions]
if not session_ids:
return RedirectResponse(url=f"/safe-patching/{campaign_id}?msg=no_pending", status_code=303)
# Passer la campagne en in_progress
db.execute(text("UPDATE campaigns SET status = 'in_progress' WHERE id = :cid"), {"cid": campaign_id})
db.commit()
# Lancer en background
start_execution(DATABASE_URL, campaign_id, session_ids, branch)
return RedirectResponse(url=f"/safe-patching/{campaign_id}/terminal?branch={branch}", status_code=303)
@router.get("/safe-patching/{campaign_id}/terminal", response_class=HTMLResponse)
async def safe_patching_terminal(request: Request, campaign_id: int, db=Depends(get_db),
branch: str = Query("hprod")):
"""Page terminal live"""
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
campaign = get_campaign(db, campaign_id)
ctx = base_context(request, db, user)
ctx.update({"app_name": APP_NAME, "c": campaign, "branch": branch})
return templates.TemplateResponse("safe_patching_terminal.html", ctx)
@router.get("/safe-patching/{campaign_id}/stream")
async def safe_patching_stream(request: Request, campaign_id: int):
"""SSE endpoint — stream les logs en temps réel"""
async def event_generator():
q = get_stream(campaign_id)
while True:
try:
msg = q.get(timeout=0.5)
data = json.dumps(msg)
yield f"data: {data}\n\n"
if msg.get("level") == "done":
break
except Exception:
yield f": keepalive\n\n"
await asyncio.sleep(0.3)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
)