"""Router campagnes — creation, prereqs, assignation, workflow""" from datetime import datetime from fastapi import APIRouter, Request, Depends, Query, Form from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from sqlalchemy import text from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, can_admin, base_context from ..services.campaign_service import ( list_campaigns, get_campaign, get_campaign_sessions, get_campaign_stats, create_campaign_from_planning, get_servers_for_planning, update_campaign_status, exclude_session, restore_session, validate_prereq, get_prereq_stats, can_plan_campaign, assign_operator, unassign_operator, get_operator_count, is_forced, update_session_schedule, get_operator_limit, set_operator_limit, get_campaign_operator_limits, ) from ..services.prereq_service import check_prereqs_campaign, check_single_prereq from ..services.secrets_service import get_secret from ..services.audit_service import ( log_campaign_create, log_campaign_status, log_campaign_delete, log_session_exclude, log_session_assign, log_session_take, log_session_release, log_prereq_check, ) from ..config import APP_NAME router = APIRouter() templates = Jinja2Templates(directory="app/templates") EXCLUSION_REASONS = [ ("obsolete", "Fin de vie (EOL)"), ("creneau_inadequat", "Creneau non adequat"), ("intervention_non_secops", "Intervention non-SecOps prevue"), ("report_cycle", "Report au cycle suivant"), ("non_patchable", "Serveur non patchable"), ("autre", "Autre"), ] def _get_max_servers(db): v = get_secret(db, "max_servers_per_operator") try: return int(v) if v else 0 except (ValueError, TypeError): return 0 @router.get("/campaigns", response_class=HTMLResponse) async def campaigns_list(request: Request, db=Depends(get_db), year: int = Query(None), status: str = Query(None)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") if not year: year = datetime.now().year perms = get_user_perms(db, user) if not can_view(perms, "campaigns"): return RedirectResponse(url="/dashboard") campaigns = list_campaigns(db, year=year, status=status) now = datetime.now() current_week = now.isocalendar()[1] planned_weeks = db.execute(text(""" SELECT DISTINCT pp.week_number, pp.week_code, pp.week_start, pp.week_end, string_agg(DISTINCT d.name || ' (' || pp.env_scope || ')', ', ' ORDER BY d.name || ' (' || pp.env_scope || ')') as scope FROM patch_planning pp LEFT JOIN domains d ON pp.domain_code = d.code WHERE pp.year = :y AND pp.status = 'open' AND pp.domain_code IS NOT NULL AND pp.week_number >= :cw GROUP BY pp.week_number, pp.week_code, pp.week_start, pp.week_end ORDER BY pp.week_number """), {"y": year, "cw": current_week}).fetchall() ctx = base_context(request, db, user) ctx.update({ "app_name": APP_NAME, "campaigns": campaigns, "year": year, "status_filter": status, "planned_weeks": planned_weeks, }) return templates.TemplateResponse("campaigns.html", ctx) @router.get("/campaigns/preview", response_class=HTMLResponse) async def campaign_preview(request: Request, db=Depends(get_db), year: int = Query(...), week: int = Query(...)): user = get_current_user(request) if not user: return HTMLResponse("

Non autorise

", status_code=401) perms = get_user_perms(db, user) if not can_view(perms, "campaigns"): return HTMLResponse("

Acces interdit

", status_code=403) servers, planning = get_servers_for_planning(db, year, week) scope = ", ".join(set(f"{p.domain_name} ({p.env_scope})" for p in planning if p.domain_code)) return templates.TemplateResponse("partials/campaign_preview.html", { "request": request, "servers": servers, "scope": scope, "week": week, "year": year, "count": len(servers), }) @router.post("/campaigns/create") async def campaign_create(request: Request, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "campaigns"): return RedirectResponse(url="/campaigns") form = await request.form() year = int(form.get("year", datetime.now().year)) week = int(form.get("week_number", 0)) label = form.get("label", f"Patch S{week:02d} {year}") excluded = [] for key in form.keys(): if key.startswith("exclude_"): excluded.append(int(key.replace("exclude_", ""))) try: cid = create_campaign_from_planning(db, year, week, label, user.get("uid"), excluded) except Exception as e: db.rollback() err = str(e) if "unique" in err.lower() or "duplicate" in err.lower(): return RedirectResponse(url=f"/campaigns?year={year}&msg=already_exists", status_code=303) import traceback; traceback.print_exc() return RedirectResponse(url=f"/campaigns?year={year}&msg=create_error", status_code=303) if not cid: return RedirectResponse(url=f"/campaigns?year={year}&msg=no_servers", status_code=303) log_campaign_create(db, request, user, cid, label) db.commit() return RedirectResponse(url=f"/campaigns/{cid}", status_code=303) @router.get("/campaigns/{campaign_id}", response_class=HTMLResponse) async def campaign_detail(request: Request, campaign_id: int, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_view(perms, "campaigns"): return RedirectResponse(url="/dashboard") campaign = get_campaign(db, campaign_id) if not campaign: return RedirectResponse(url="/campaigns") sessions = get_campaign_sessions(db, campaign_id) stats = get_campaign_stats(db, campaign_id) prereq = get_prereq_stats(db, campaign_id) can_plan_flag = can_plan_campaign(db, campaign_id) perms = get_user_perms(db, user) intervenants = db.execute(text( "SELECT id, display_name FROM users WHERE is_active = true ORDER BY display_name" )).fetchall() op_limits = get_campaign_operator_limits(db, campaign_id) # Compteur par operateur op_counts = db.execute(text(""" SELECT u.display_name, COUNT(*) as count FROM patch_sessions ps JOIN users u ON ps.intervenant_id = u.id WHERE ps.campaign_id = :cid AND ps.status NOT IN ('excluded','cancelled') GROUP BY u.display_name ORDER BY count DESC """), {"cid": campaign_id}).fetchall() ctx = base_context(request, db, user) ctx.update({ "app_name": APP_NAME, "c": campaign, "sessions": sessions, "stats": stats, "prereq": prereq, "can_plan": can_plan_flag, "exclusion_reasons": EXCLUSION_REASONS, "can_edit_campaigns": can_edit(perms, "campaigns"), "can_admin_campaigns": can_admin(perms, "campaigns"), "intervenants": intervenants, "op_limits": op_limits, "op_counts": op_counts, "msg": request.query_params.get("msg"), }) return templates.TemplateResponse("campaign_detail.html", ctx) @router.post("/campaigns/{campaign_id}/status") async def campaign_status_change(request: Request, campaign_id: int, db=Depends(get_db), new_status: str = Form(...)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "campaigns"): return RedirectResponse(url=f"/campaigns/{campaign_id}?msg=forbidden", status_code=303) if new_status == "planned" and not can_plan_campaign(db, campaign_id): return RedirectResponse(url=f"/campaigns/{campaign_id}?msg=prereq_needed", status_code=303) campaign = get_campaign(db, campaign_id) old_status = campaign.status if campaign else "unknown" update_campaign_status(db, campaign_id, new_status) log_campaign_status(db, request, user, campaign_id, old_status, new_status) db.commit() return RedirectResponse(url=f"/campaigns/{campaign_id}", status_code=303) @router.post("/campaigns/{campaign_id}/delete") async def campaign_delete(request: Request, campaign_id: int, db=Depends(get_db)): """Supprime completement une campagne (admin/coordinateur)""" user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "campaigns"): return RedirectResponse(url="/campaigns", status_code=303) campaign = get_campaign(db, campaign_id) if not campaign: return RedirectResponse(url="/campaigns", status_code=303) year = campaign.year # Supprimer sessions puis campagne db.execute(text("DELETE FROM campaign_operator_limits WHERE campaign_id = :cid"), {"cid": campaign_id}) db.execute(text("DELETE FROM patch_sessions WHERE campaign_id = :cid"), {"cid": campaign_id}) db.execute(text("DELETE FROM campaigns WHERE id = :cid"), {"cid": campaign_id}) db.commit() return RedirectResponse(url=f"/campaigns?year={year}&msg=deleted", status_code=303) @router.post("/campaigns/session/{session_id}/prereq") async def session_prereq(request: Request, session_id: int, db=Depends(get_db), prereq_ssh: str = Form(...), prereq_satellite: str = Form(...), rollback_method: str = Form(""), rollback_justif: str = Form("")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_view(perms, "campaigns"): return RedirectResponse(url="/campaigns") validate_prereq(db, session_id, prereq_ssh, prereq_satellite, rollback_method or None, rollback_justif, user.get("sub")) row = db.execute(text("SELECT campaign_id FROM patch_sessions WHERE id = :id"), {"id": session_id}).fetchone() return RedirectResponse(url=f"/campaigns/{row.campaign_id}?msg=prereq_saved", status_code=303) @router.post("/campaigns/{campaign_id}/check-prereqs") async def campaign_check_prereqs(request: Request, campaign_id: int, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "campaigns"): return RedirectResponse(url=f"/campaigns/{campaign_id}") checked, auto_excluded = check_prereqs_campaign(db, campaign_id) log_prereq_check(db, request, user, campaign_id, checked, auto_excluded) db.commit() return RedirectResponse(url=f"/campaigns/{campaign_id}?msg=checked_{checked}_{auto_excluded}", status_code=303) @router.post("/campaigns/session/{session_id}/check-prereq") async def session_check_prereq(request: Request, session_id: int, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_view(perms, "campaigns"): return RedirectResponse(url="/campaigns") check_single_prereq(db, session_id) row = db.execute(text("SELECT campaign_id FROM patch_sessions WHERE id = :id"), {"id": session_id}).fetchone() return RedirectResponse(url=f"/campaigns/{row.campaign_id}?msg=prereq_checked", status_code=303) @router.post("/campaigns/session/{session_id}/exclude") async def session_exclude(request: Request, session_id: int, db=Depends(get_db), reason: str = Form(...), detail: str = Form("")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "campaigns"): return RedirectResponse(url="/campaigns") exclude_session(db, session_id, reason, detail, user.get("sub")) row = db.execute(text("SELECT campaign_id FROM patch_sessions WHERE id = :id"), {"id": session_id}).fetchone() return RedirectResponse(url=f"/campaigns/{row.campaign_id}?msg=excluded", status_code=303) @router.post("/campaigns/session/{session_id}/restore") async def session_restore(request: Request, session_id: int, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "campaigns"): return RedirectResponse(url="/campaigns") restore_session(db, session_id) row = db.execute(text("SELECT campaign_id FROM patch_sessions WHERE id = :id"), {"id": session_id}).fetchone() return RedirectResponse(url=f"/campaigns/{row.campaign_id}?msg=restored", status_code=303) # --- Assignation operateurs --- @router.post("/campaigns/session/{session_id}/take") async def session_take(request: Request, session_id: int, db=Depends(get_db)): """Operateur prend un serveur""" user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_view(perms, "campaigns"): return RedirectResponse(url="/campaigns") row = db.execute(text("SELECT campaign_id, intervenant_id, forced_assignment FROM patch_sessions WHERE id = :id"), {"id": session_id}).fetchone() if row.intervenant_id: return RedirectResponse(url=f"/campaigns/{row.campaign_id}?msg=already_taken", status_code=303) # Verifier limite par campagne pour cet operateur limit = get_operator_limit(db, row.campaign_id, user.get("uid")) if limit > 0: current = get_operator_count(db, row.campaign_id, user.get("uid")) if current >= limit: return RedirectResponse(url=f"/campaigns/{row.campaign_id}?msg=limit_reached", status_code=303) assign_operator(db, session_id, user.get("uid")) return RedirectResponse(url=f"/campaigns/{row.campaign_id}?msg=taken", status_code=303) @router.post("/campaigns/session/{session_id}/release") async def session_release(request: Request, session_id: int, db=Depends(get_db)): """Operateur se desassigne (sauf si forced)""" user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_view(perms, "campaigns"): return RedirectResponse(url="/campaigns") if is_forced(db, session_id): row = db.execute(text("SELECT campaign_id FROM patch_sessions WHERE id = :id"), {"id": session_id}).fetchone() return RedirectResponse(url=f"/campaigns/{row.campaign_id}?msg=forced_cant_release", status_code=303) unassign_operator(db, session_id) row = db.execute(text("SELECT campaign_id FROM patch_sessions WHERE id = :id"), {"id": session_id}).fetchone() return RedirectResponse(url=f"/campaigns/{row.campaign_id}?msg=released", status_code=303) @router.post("/campaigns/session/{session_id}/assign") async def session_assign(request: Request, session_id: int, db=Depends(get_db), operator_id: str = Form(""), forced: str = Form("")): """Coordinateur assigne un operateur (peut forcer)""" user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "campaigns"): return RedirectResponse(url="/campaigns") oid = int(operator_id) if operator_id else None is_forced_flag = forced == "on" if oid: assign_operator(db, session_id, oid, forced=is_forced_flag) else: unassign_operator(db, session_id) row = db.execute(text("SELECT campaign_id FROM patch_sessions WHERE id = :id"), {"id": session_id}).fetchone() return RedirectResponse(url=f"/campaigns/{row.campaign_id}?msg=assigned", status_code=303) # --- Limites operateurs par campagne --- @router.post("/campaigns/{campaign_id}/operator-limit") async def set_op_limit(request: Request, campaign_id: int, db=Depends(get_db), operator_id: int = Form(...), max_servers: int = Form(0), note: str = Form("")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "campaigns"): return RedirectResponse(url=f"/campaigns/{campaign_id}") set_operator_limit(db, campaign_id, operator_id, max_servers, note or None) return RedirectResponse(url=f"/campaigns/{campaign_id}?msg=limit_set", status_code=303) # --- Assignations par defaut --- @router.get("/assignments", response_class=HTMLResponse) async def assignments_page(request: Request, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "campaigns"): return RedirectResponse(url="/campaigns") rules = db.execute(text(""" SELECT da.*, u.display_name FROM default_assignments da JOIN users u ON da.user_id = u.id ORDER BY da.priority, da.rule_type """)).fetchall() operators = db.execute(text( "SELECT id, display_name FROM users WHERE is_active = true ORDER BY display_name" )).fetchall() domains = db.execute(text("SELECT code, name FROM domains ORDER BY display_order")).fetchall() zones = db.execute(text("SELECT DISTINCT name FROM zones ORDER BY name")).fetchall() app_types = db.execute(text( "SELECT DISTINCT app_type FROM server_specifics WHERE app_type IS NOT NULL ORDER BY app_type" )).fetchall() ctx = base_context(request, db, user) ctx.update({ "app_name": APP_NAME, "rules": rules, "operators": operators, "domains": domains, "zones": zones, "app_types": [r.app_type for r in app_types], "msg": request.query_params.get("msg"), }) return templates.TemplateResponse("assignments.html", ctx) @router.post("/assignments/add") async def assignment_add(request: Request, db=Depends(get_db), rule_type: str = Form(...), rule_value: str = Form(...), user_id: int = Form(...), priority: int = Form(10), note: str = Form("")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "campaigns"): return RedirectResponse(url="/assignments") try: db.execute(text(""" INSERT INTO default_assignments (rule_type, rule_value, user_id, priority, note) VALUES (:rt, :rv, :uid, :p, :n) """), {"rt": rule_type, "rv": rule_value.strip(), "uid": user_id, "p": priority, "n": note or None}) db.commit() return RedirectResponse(url="/assignments?msg=added", status_code=303) except Exception: db.rollback() return RedirectResponse(url="/assignments?msg=error", status_code=303) @router.post("/assignments/{rule_id}/delete") async def assignment_delete(request: Request, rule_id: int, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "campaigns"): return RedirectResponse(url="/assignments") db.execute(text("DELETE FROM default_assignments WHERE id = :id"), {"id": rule_id}) db.commit() return RedirectResponse(url="/assignments?msg=deleted", status_code=303) # --- Bulk actions campagne --- @router.post("/campaigns/{campaign_id}/bulk/take") async def bulk_take(request: Request, campaign_id: int, db=Depends(get_db), session_ids: str = Form("")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_view(perms, "campaigns"): return RedirectResponse(url="/campaigns") ids = [int(x) for x in session_ids.split(",") if x.strip().isdigit()] limit = get_operator_limit(db, campaign_id, user.get("uid")) current = get_operator_count(db, campaign_id, user.get("uid")) for sid in ids: if limit > 0 and current >= limit: break row = db.execute(text("SELECT intervenant_id FROM patch_sessions WHERE id = :id"), {"id": sid}).fetchone() if row and not row.intervenant_id: assign_operator(db, sid, user.get("uid")) current += 1 return RedirectResponse(url=f"/campaigns/{campaign_id}?msg=bulk_taken", status_code=303) @router.post("/campaigns/{campaign_id}/bulk/assign") async def bulk_assign(request: Request, campaign_id: int, db=Depends(get_db), session_ids: str = Form(""), operator_id: str = Form("")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "campaigns"): return RedirectResponse(url=f"/campaigns/{campaign_id}", status_code=303) ids = [int(x) for x in session_ids.split(",") if x.strip().isdigit()] oid = int(operator_id) if operator_id else None for sid in ids: if oid: assign_operator(db, sid, oid) else: unassign_operator(db, sid) return RedirectResponse(url=f"/campaigns/{campaign_id}?msg=bulk_assigned", status_code=303) @router.post("/campaigns/{campaign_id}/bulk/exclude") async def bulk_exclude(request: Request, campaign_id: int, db=Depends(get_db), session_ids: str = Form(""), reason: str = Form("autre")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "campaigns"): return RedirectResponse(url=f"/campaigns/{campaign_id}", status_code=303) ids = [int(x) for x in session_ids.split(",") if x.strip().isdigit()] for sid in ids: exclude_session(db, sid, reason, "Exclusion groupée", user.get("sub")) return RedirectResponse(url=f"/campaigns/{campaign_id}?msg=bulk_excluded", status_code=303) @router.post("/campaigns/session/{session_id}/schedule") async def session_schedule(request: Request, session_id: int, db=Depends(get_db), date_prevue: str = Form(""), heure_prevue: str = Form("")): """Coordinateur ajuste date/heure""" user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "campaigns"): return RedirectResponse(url="/campaigns") update_session_schedule(db, session_id, date_prevue or None, heure_prevue or None) row = db.execute(text("SELECT campaign_id FROM patch_sessions WHERE id = :id"), {"id": session_id}).fetchone() return RedirectResponse(url=f"/campaigns/{row.campaign_id}?msg=scheduled", status_code=303)