patchcenter/app/routers/planning.py
Khalid MOUTAOUAKIL 13290c1ebb Phase 1 securite: permission checks sur tous les routers
- auth: verification is_active au login (compte desactive = bloque)
- settings: enforcement backend can_edit(settings) + role/section
- servers: can_view/can_edit(servers) sur toutes les routes
- planning: can_view/can_edit(planning) sur toutes les routes
- specifics: can_view/can_edit(specifics) sur toutes les routes
- contacts: rattache au module servers (can_view/can_edit)
- campaigns: can_view/can_edit(campaigns) sur toutes les routes manquantes
- audit/audit_full: can_view/can_edit(audit) sur toutes les routes
- qualys: can_view/can_edit(qualys) sur toutes les routes
- safe_patching: perm checks + authentification sur SSE stream
- quickwin: can_view/can_edit(campaigns|quickwin) sur toutes les routes

97 points d'injection securises, 0 route sans controle

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-08 16:46:05 +02:00

216 lines
9.1 KiB
Python

"""Router planning — planning annuel de patching par domaine"""
from datetime import datetime, date, timedelta
from fastapi import APIRouter, Request, Depends, Query, Form
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, can_admin, base_context
from ..config import APP_NAME
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
MONTHS = ["Jan", "Fev", "Mar", "Avr", "Mai", "Jun", "Jul", "Aou", "Sep", "Oct", "Nov", "Dec"]
DOMAIN_COLORS = {
"TRA": "#E67E22", "PEA": "#8E44AD", "FL": "#2ECC71", "BI": "#F1C40F",
"INF": "#3498DB", "GES": "#1ABC9C", "EMV": "#E74C3C", "DMZ": "#D4A0A0",
}
ENV_SCOPES = ["prod", "hprod", "all", "pilot", "prod_pilot"]
STATUSES = ["open", "freeze", "holiday", "empty"]
def _week_dates(year, week_num):
jan4 = date(year, 1, 4)
start_of_w1 = jan4 - timedelta(days=jan4.isoweekday() - 1)
monday = start_of_w1 + timedelta(weeks=week_num - 1)
sunday = monday + timedelta(days=6)
return monday, sunday
def _get_planning_data(db, year):
rows = db.execute(text("""
SELECT pp.*, d.name as domain_name
FROM patch_planning pp
LEFT JOIN domains d ON pp.domain_code = d.code
WHERE pp.year = :y
ORDER BY pp.week_number, pp.domain_code
"""), {"y": year}).fetchall()
domains = db.execute(text("""
SELECT d.code, d.name, COUNT(s.id) as srv_count
FROM domains d
LEFT JOIN domain_environments de ON de.domain_id = d.id
LEFT JOIN servers s ON s.domain_env_id = de.id AND s.etat = 'en_production'
GROUP BY d.code, d.name, d.display_order
ORDER BY d.display_order
""")).fetchall()
freeze_weeks = set()
grid = {}
for r in rows:
if r.status == 'freeze':
freeze_weeks.add(r.week_number)
if r.domain_code:
if r.domain_code not in grid:
grid[r.domain_code] = {}
grid[r.domain_code][r.week_number] = {
"id": r.id, "cycle": r.cycle, "env_scope": r.env_scope,
"note": r.note, "status": r.status,
}
all_domains = db.execute(text("SELECT code, name FROM domains ORDER BY display_order")).fetchall()
# Annees disponibles
years_in_db = db.execute(text("SELECT DISTINCT year FROM patch_planning ORDER BY year")).fetchall()
available_years = [r.year for r in years_in_db]
return rows, domains, grid, freeze_weeks, all_domains, available_years
@router.get("/planning", response_class=HTMLResponse)
async def planning_page(request: Request, db=Depends(get_db),
year: int = Query(None), msg: str = Query(None)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
if not year:
year = datetime.now().year
rows, domains, grid, freeze_weeks, all_domains, available_years = _get_planning_data(db, year)
now = datetime.now()
next_week = now.isocalendar()[1] + 1
if next_week > 53:
next_week = 1
perms = get_user_perms(db, user)
if not can_view(perms, "planning"):
return RedirectResponse(url="/dashboard")
return templates.TemplateResponse("planning.html", {
"request": request, "user": user, "perms": perms, "app_name": APP_NAME,
"year": year, "domains": domains, "grid": grid,
"freeze_weeks": freeze_weeks, "months": MONTHS,
"domain_colors": DOMAIN_COLORS, "weeks": range(1, 54),
"entries": rows, "all_domains": all_domains,
"env_scopes": ENV_SCOPES, "statuses": STATUSES,
"available_years": available_years, "msg": msg,
"default_week": next_week,
})
@router.post("/planning/add")
async def planning_add(request: Request, db=Depends(get_db),
year: str = Form(...), week_number: str = Form(...),
domain_code: str = Form(""), env_scope: str = Form("all"),
cycle: str = Form(""), status: str = Form("open"), note: str = Form("")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "planning"):
return RedirectResponse(url="/planning")
y = int(year)
wn = int(week_number) if week_number else 0
cyc = int(cycle) if cycle.strip() else None
if not wn or wn < 1 or wn > 53:
return RedirectResponse(url=f"/planning?year={y}&msg=err_week", status_code=303)
if not domain_code and status == 'open':
return RedirectResponse(url=f"/planning?year={y}&msg=err_domain", status_code=303)
# Pas dans le passe — semaine en cours acceptee lundi/mardi (MEP urgente)
now = datetime.now()
current_week = now.isocalendar()[1]
current_year = now.isocalendar()[0]
weekday = now.isoweekday() # 1=lundi, 7=dimanche
if y < current_year:
return RedirectResponse(url=f"/planning?year={y}&msg=err_past", status_code=303)
if y == current_year:
if wn < current_week:
return RedirectResponse(url=f"/planning?year={y}&msg=err_past", status_code=303)
if wn == current_week and weekday > 2:
return RedirectResponse(url=f"/planning?year={y}&msg=err_past_wed", status_code=303)
ws, we = _week_dates(y, wn)
wc = f"S{wn:02d}"
db.execute(text("""
INSERT INTO patch_planning (year, week_number, week_code, week_start, week_end, cycle, domain_code, env_scope, status, note)
VALUES (:y, :wn, :wc, :ws, :we, :cyc, :dc, :es, :st, :nt)
"""), {"y": y, "wn": wn, "wc": wc, "ws": ws, "we": we,
"cyc": cyc, "dc": domain_code or None, "es": env_scope, "st": status,
"nt": note or None})
db.commit()
return RedirectResponse(url=f"/planning?year={y}&msg=add", status_code=303)
@router.post("/planning/{entry_id}/edit")
async def planning_edit(request: Request, entry_id: int, db=Depends(get_db),
domain_code: str = Form(""), env_scope: str = Form("all"),
cycle: str = Form(""), status: str = Form("open"), note: str = Form("")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "planning"):
return RedirectResponse(url="/planning")
row = db.execute(text("SELECT year FROM patch_planning WHERE id = :id"), {"id": entry_id}).fetchone()
cyc = int(cycle) if cycle.strip() else None
db.execute(text("""
UPDATE patch_planning SET domain_code = :dc, env_scope = :es, cycle = :cyc, status = :st, note = :nt
WHERE id = :id
"""), {"dc": domain_code or None, "es": env_scope, "cyc": cyc,
"st": status, "nt": note or None, "id": entry_id})
db.commit()
y = row.year if row else datetime.now().year
return RedirectResponse(url=f"/planning?year={y}&msg=edit", status_code=303)
@router.post("/planning/{entry_id}/delete")
async def planning_delete(request: Request, entry_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "planning"):
return RedirectResponse(url="/planning")
row = db.execute(text("SELECT year FROM patch_planning WHERE id = :id"), {"id": entry_id}).fetchone()
db.execute(text("DELETE FROM patch_planning WHERE id = :id"), {"id": entry_id})
db.commit()
y = row.year if row else datetime.now().year
return RedirectResponse(url=f"/planning?year={y}&msg=delete", status_code=303)
@router.post("/planning/duplicate")
async def planning_duplicate(request: Request, db=Depends(get_db),
source_year: int = Form(...), target_year: int = Form(...)):
"""Duplique le planning d'une annee vers une autre"""
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "planning"):
return RedirectResponse(url="/planning")
# Verifier que l'annee cible est vide
existing = db.execute(text("SELECT COUNT(*) FROM patch_planning WHERE year = :y"),
{"y": target_year}).scalar()
if existing > 0:
return RedirectResponse(url=f"/planning?year={target_year}&msg=exists", status_code=303)
# Copier toutes les entrees en recalculant les dates
sources = db.execute(text("SELECT * FROM patch_planning WHERE year = :y ORDER BY week_number"),
{"y": source_year}).fetchall()
for s in sources:
ws, we = _week_dates(target_year, s.week_number)
wc = f"S{s.week_number:02d}"
db.execute(text("""
INSERT INTO patch_planning (year, week_number, week_code, week_start, week_end, cycle, domain_code, env_scope, status, note)
VALUES (:y, :wn, :wc, :ws, :we, :cyc, :dc, :es, :st, :nt)
"""), {"y": target_year, "wn": s.week_number, "wc": wc, "ws": ws, "we": we,
"cyc": s.cycle, "dc": s.domain_code, "es": s.env_scope, "st": s.status,
"nt": s.note})
db.commit()
return RedirectResponse(url=f"/planning?year={target_year}&msg=duplicate", status_code=303)