130 lines
5.8 KiB
Python
130 lines
5.8 KiB
Python
from fastapi import APIRouter, Request, Depends, Form
|
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from sqlalchemy import text
|
|
from ..dependencies import get_current_user
|
|
from ..database import SessionLocal, SessionLocalDemo
|
|
from ..auth import verify_password, create_access_token, hash_password
|
|
from ..services.audit_service import log_login, log_logout, log_login_failed
|
|
from ..services.ldap_service import is_enabled as ldap_enabled, authenticate as ldap_authenticate
|
|
from ..config import APP_NAME, APP_VERSION
|
|
|
|
router = APIRouter()
|
|
templates = Jinja2Templates(directory="app/templates")
|
|
|
|
@router.get("/login", response_class=HTMLResponse)
|
|
async def login_page(request: Request):
|
|
db = SessionLocal()
|
|
try:
|
|
ldap_ok = ldap_enabled(db)
|
|
finally:
|
|
db.close()
|
|
return templates.TemplateResponse("login.html", {
|
|
"request": request, "app_name": APP_NAME, "version": APP_VERSION,
|
|
"error": None, "ldap_enabled": ldap_ok,
|
|
})
|
|
|
|
@router.post("/login")
|
|
async def login(request: Request, username: str = Form(...), password: str = Form(...),
|
|
mode: str = Form("reel"), auth_method: str = Form("local")):
|
|
# Select DB based on mode
|
|
factory = SessionLocalDemo if mode == "demo" else SessionLocal
|
|
db = factory()
|
|
try:
|
|
row = db.execute(text("SELECT id, username, password_hash, role, is_active, auth_type FROM users WHERE LOWER(username) = LOWER(:u)"),
|
|
{"u": username}).fetchone()
|
|
|
|
ldap_is_on = ldap_enabled(db)
|
|
err_template = lambda msg: templates.TemplateResponse("login.html", {
|
|
"request": request, "app_name": APP_NAME, "version": APP_VERSION,
|
|
"error": msg, "ldap_enabled": ldap_is_on
|
|
})
|
|
|
|
# Auto-provision LDAP : user inconnu en base mais peut etre dans AD groupe autorise
|
|
if not row and auth_method == "ldap" and ldap_is_on:
|
|
result = ldap_authenticate(db, username, password)
|
|
if not result.get("ok"):
|
|
log_login_failed(db, request, username)
|
|
db.commit()
|
|
return err_template(result.get("msg") or "Authentification LDAP echouee")
|
|
# Cree l'user DESACTIVE + sans permissions. Admin doit l'activer + permissionner.
|
|
default_role = result.get("default_role", "viewer")
|
|
db.execute(text("""
|
|
INSERT INTO users (username, email, display_name, role, is_active, auth_type, password_hash)
|
|
VALUES (:u, :e, :n, :r, false, 'ldap', '')
|
|
"""), {"u": username, "e": result.get("email", ""),
|
|
"n": result.get("name", username), "r": default_role})
|
|
db.commit()
|
|
log_login_failed(db, request, username) # trace de l'auto-creation
|
|
db.commit()
|
|
return err_template("Compte cree mais en attente d'activation par un administrateur")
|
|
elif not row:
|
|
log_login_failed(db, request, username)
|
|
db.commit()
|
|
return err_template("Utilisateur inconnu")
|
|
elif not row.is_active:
|
|
log_login_failed(db, request, username)
|
|
db.commit()
|
|
return err_template("Compte desactive")
|
|
else:
|
|
# Choix de la methode d'auth pour user existant
|
|
use_ldap = (auth_method == "ldap") or (row.auth_type == "ldap")
|
|
if use_ldap and not ldap_is_on:
|
|
return err_template("LDAP non active")
|
|
|
|
if use_ldap:
|
|
result = ldap_authenticate(db, username, password)
|
|
if not result.get("ok"):
|
|
log_login_failed(db, request, username)
|
|
db.commit()
|
|
return err_template(result.get("msg") or "Authentification LDAP echouee")
|
|
ok = True
|
|
else:
|
|
try:
|
|
ok = verify_password(password, row.password_hash or "")
|
|
except Exception:
|
|
ok = False
|
|
if not ok:
|
|
log_login_failed(db, request, username)
|
|
db.commit()
|
|
return err_template("Mot de passe incorrect")
|
|
# Include display_name + auth_type dans le JWT pour affichage propre
|
|
display_row = db.execute(text(
|
|
"SELECT display_name, auth_type FROM users WHERE id=:uid"
|
|
), {"uid": row.id}).fetchone()
|
|
display_name = display_row.display_name if display_row else row.username
|
|
auth_type = display_row.auth_type if display_row else "local"
|
|
token = create_access_token({
|
|
"sub": row.username, "role": row.role, "uid": row.id, "mode": mode,
|
|
"display": display_name, "auth": auth_type,
|
|
})
|
|
user = {"sub": row.username, "role": row.role, "uid": row.id, "mode": mode,
|
|
"display": display_name, "auth": auth_type}
|
|
log_login(db, request, user)
|
|
db.commit()
|
|
# Redirect
|
|
perms = db.execute(text("SELECT module FROM user_permissions WHERE user_id = :uid"), {"uid": row.id}).fetchall()
|
|
modules = {r.module for r in perms}
|
|
redirect_url = "/quickwin" if modules == {"quickwin"} else "/dashboard"
|
|
response = RedirectResponse(url=redirect_url, status_code=303)
|
|
response.set_cookie(key="access_token", value=token, httponly=True, samesite="lax", max_age=3600)
|
|
return response
|
|
finally:
|
|
db.close()
|
|
|
|
@router.get("/logout")
|
|
async def logout(request: Request):
|
|
user = get_current_user(request)
|
|
if user:
|
|
mode = user.get("mode", "reel")
|
|
factory = SessionLocalDemo if mode == "demo" else SessionLocal
|
|
db = factory()
|
|
try:
|
|
log_logout(db, request, user)
|
|
db.commit()
|
|
finally:
|
|
db.close()
|
|
response = RedirectResponse(url="/login", status_code=302)
|
|
response.delete_cookie("access_token")
|
|
return response
|