"""PatchCenter v2 — Entry point FastAPI""" from fastapi import FastAPI, Request from fastapi.responses import RedirectResponse from fastapi.staticfiles import StaticFiles from starlette.middleware.base import BaseHTTPMiddleware from .config import APP_NAME, APP_VERSION from .dependencies import get_current_user, get_user_perms from .database import SessionLocal, SessionLocalDemo from .routers import auth, dashboard, servers, settings, users, campaigns, planning, specifics, audit, contacts, qualys, quickwin, referentiel, patching, applications class PermissionsMiddleware(BaseHTTPMiddleware): """Injecte user + perms dans request.state pour tous les templates. Gère aussi la redirection si force_password_change est activé.""" async def dispatch(self, request: Request, call_next): user = get_current_user(request) perms = {} must_change_pwd = False if user: # Sélectionner la base selon le mode JWT (prod/demo) factory = SessionLocalDemo if user.get("mode") == "demo" else SessionLocal db = factory() try: perms = get_user_perms(db, user) # Check force_password_change from sqlalchemy import text row = db.execute(text("SELECT force_password_change FROM users WHERE id=:uid"), {"uid": user.get("uid")}).fetchone() if row and row.force_password_change: must_change_pwd = True finally: db.close() request.state.user = user request.state.perms = perms request.state.must_change_pwd = must_change_pwd # Redirect vers change-password si forcé (sauf pour les routes de changement/logout/static) if must_change_pwd and user: allowed = ("/me/change-password", "/logout", "/static/") if not any(request.url.path.startswith(p) for p in allowed): from fastapi.responses import RedirectResponse return RedirectResponse(url="/me/change-password", status_code=303) response = await call_next(request) return response app = FastAPI(title=APP_NAME, version=APP_VERSION) app.add_middleware(PermissionsMiddleware) app.mount("/static", StaticFiles(directory="app/static"), name="static") app.include_router(auth.router) app.include_router(dashboard.router) app.include_router(servers.router) app.include_router(settings.router) app.include_router(users.router) app.include_router(campaigns.router) app.include_router(planning.router) app.include_router(specifics.router) app.include_router(audit.router) app.include_router(contacts.router) app.include_router(qualys.router) app.include_router(quickwin.router) app.include_router(referentiel.router) app.include_router(patching.router) app.include_router(applications.router) @app.get("/") async def root(request: Request): user = get_current_user(request) if user: return RedirectResponse(url="/dashboard") return RedirectResponse(url="/login") @app.get("/health") async def health(): return {"status": "ok", "app": APP_NAME, "version": APP_VERSION} # --- Error handlers --- from fastapi.templating import Jinja2Templates _error_templates = Jinja2Templates(directory="app/templates") @app.exception_handler(500) async def internal_error(request: Request, exc): return _error_templates.TemplateResponse("error.html", { "request": request, "code": 500, "title": "Application en maintenance", "message": "Une erreur interne est survenue. L'équipe technique a été notifiée.", }, status_code=500) @app.exception_handler(404) async def not_found(request: Request, exc): return _error_templates.TemplateResponse("error.html", { "request": request, "code": 404, "title": "Page introuvable", "message": "La page demandée n'existe pas.", }, status_code=404) @app.exception_handler(Exception) async def generic_error(request: Request, exc): import traceback traceback.print_exc() return _error_templates.TemplateResponse("error.html", { "request": request, "code": 500, "title": "Application en maintenance", "message": "Une erreur interne est survenue. L'équipe technique a été notifiée.", }, status_code=500)