from fastapi import APIRouter, Request, Depends, Form from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from sqlalchemy import text from ..dependencies import get_current_user from ..database import SessionLocal, SessionLocalDemo from ..auth import verify_password, create_access_token, hash_password from ..services.audit_service import log_login, log_logout, log_login_failed from ..services.ldap_service import is_enabled as ldap_enabled, authenticate as ldap_authenticate from ..config import APP_NAME, APP_VERSION router = APIRouter() templates = Jinja2Templates(directory="app/templates") @router.get("/login", response_class=HTMLResponse) async def login_page(request: Request): db = SessionLocal() try: ldap_ok = ldap_enabled(db) finally: db.close() return templates.TemplateResponse("login.html", { "request": request, "app_name": APP_NAME, "version": APP_VERSION, "error": None, "ldap_enabled": ldap_ok, }) @router.post("/login") async def login(request: Request, username: str = Form(...), password: str = Form(...), mode: str = Form("reel"), auth_method: str = Form("local")): # Select DB based on mode factory = SessionLocalDemo if mode == "demo" else SessionLocal db = factory() try: row = db.execute(text("SELECT id, username, password_hash, role, is_active, auth_type FROM users WHERE LOWER(username) = LOWER(:u)"), {"u": username}).fetchone() ldap_is_on = ldap_enabled(db) err_template = lambda msg: templates.TemplateResponse("login.html", { "request": request, "app_name": APP_NAME, "version": APP_VERSION, "error": msg, "ldap_enabled": ldap_is_on }) # Auto-provision LDAP : user inconnu en base mais peut etre dans AD groupe autorise if not row and auth_method == "ldap" and ldap_is_on: result = ldap_authenticate(db, username, password) if not result.get("ok"): log_login_failed(db, request, username) db.commit() return err_template(result.get("msg") or "Authentification LDAP echouee") # Cree l'user en local avec role par defaut default_role = result.get("default_role", "operator") db.execute(text(""" INSERT INTO users (username, email, full_name, role, is_active, auth_type, password_hash) VALUES (:u, :e, :n, :r, true, 'ldap', '') """), {"u": username, "e": result.get("email", ""), "n": result.get("name", username), "r": default_role}) db.commit() row = db.execute(text("SELECT id, username, password_hash, role, is_active, auth_type FROM users WHERE LOWER(username)=LOWER(:u)"), {"u": username}).fetchone() ok = True elif not row: log_login_failed(db, request, username) db.commit() return err_template("Utilisateur inconnu") elif not row.is_active: log_login_failed(db, request, username) db.commit() return err_template("Compte desactive") else: # Choix de la methode d'auth pour user existant use_ldap = (auth_method == "ldap") or (row.auth_type == "ldap") if use_ldap and not ldap_is_on: return err_template("LDAP non active") if use_ldap: result = ldap_authenticate(db, username, password) if not result.get("ok"): log_login_failed(db, request, username) db.commit() return err_template(result.get("msg") or "Authentification LDAP echouee") ok = True else: try: ok = verify_password(password, row.password_hash or "") except Exception: ok = False if not ok: log_login_failed(db, request, username) db.commit() return err_template("Mot de passe incorrect") # Include mode in JWT token token = create_access_token({"sub": row.username, "role": row.role, "uid": row.id, "mode": mode}) user = {"sub": row.username, "role": row.role, "uid": row.id, "mode": mode} log_login(db, request, user) db.commit() # Redirect perms = db.execute(text("SELECT module FROM user_permissions WHERE user_id = :uid"), {"uid": row.id}).fetchall() modules = {r.module for r in perms} redirect_url = "/quickwin" if modules == {"quickwin"} else "/dashboard" response = RedirectResponse(url=redirect_url, status_code=303) response.set_cookie(key="access_token", value=token, httponly=True, samesite="lax", max_age=3600) return response finally: db.close() @router.get("/logout") async def logout(request: Request): user = get_current_user(request) if user: mode = user.get("mode", "reel") factory = SessionLocalDemo if mode == "demo" else SessionLocal db = factory() try: log_logout(db, request, user) db.commit() finally: db.close() response = RedirectResponse(url="/login", status_code=302) response.delete_cookie("access_token") return response