patchcenter/app/routers/auth.py
Admin MPCZ 8479d7280e Users/Contacts: workflow profils + LDAP + sync iTop + etat aligne
- Users: 4 profils (admin/coordinator/operator/viewer) remplacent la matrix
- /users/add: picker contacts iTop (plus de creation libre)
- /me/change-password: flow force_password_change
- LDAP: service + section settings + option login
- Sync iTop contacts: filtre par teams (SecOps/iPOP/Externe/DSI/Admin DSI)
- Auto-desactivation users si contact inactif
- etat: alignement sur enum iTop (production/implementation/stock/obsolete)
- Menu: Contacts dans Administration, Serveurs en groupe repliable
- Audit bases: demo/prod via JWT mode

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:50:43 +02:00

103 lines
4.1 KiB
Python

from fastapi import APIRouter, Request, Depends, Form
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
from ..dependencies import get_current_user
from ..database import SessionLocal, SessionLocalDemo
from ..auth import verify_password, create_access_token, hash_password
from ..services.audit_service import log_login, log_logout, log_login_failed
from ..services.ldap_service import is_enabled as ldap_enabled, authenticate as ldap_authenticate
from ..config import APP_NAME, APP_VERSION
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
@router.get("/login", response_class=HTMLResponse)
async def login_page(request: Request):
db = SessionLocal()
try:
ldap_ok = ldap_enabled(db)
finally:
db.close()
return templates.TemplateResponse("login.html", {
"request": request, "app_name": APP_NAME, "version": APP_VERSION,
"error": None, "ldap_enabled": ldap_ok,
})
@router.post("/login")
async def login(request: Request, username: str = Form(...), password: str = Form(...),
mode: str = Form("reel"), auth_method: str = Form("local")):
# Select DB based on mode
factory = SessionLocalDemo if mode == "demo" else SessionLocal
db = factory()
try:
row = db.execute(text("SELECT id, username, password_hash, role, is_active, auth_type FROM users WHERE LOWER(username) = LOWER(:u)"),
{"u": username}).fetchone()
ldap_is_on = ldap_enabled(db)
err_template = lambda msg: templates.TemplateResponse("login.html", {
"request": request, "app_name": APP_NAME, "version": APP_VERSION,
"error": msg, "ldap_enabled": ldap_is_on
})
if not row:
log_login_failed(db, request, username)
db.commit()
return err_template("Utilisateur inconnu")
if not row.is_active:
log_login_failed(db, request, username)
db.commit()
return err_template("Compte desactive")
# Choix de la methode d'auth
use_ldap = (auth_method == "ldap") or (row.auth_type == "ldap")
if use_ldap and not ldap_is_on:
return err_template("LDAP non active")
if use_ldap:
result = ldap_authenticate(db, username, password)
if not result.get("ok"):
log_login_failed(db, request, username)
db.commit()
return err_template(result.get("msg") or "Authentification LDAP echouee")
ok = True
else:
try:
ok = verify_password(password, row.password_hash or "")
except Exception:
ok = False
if not ok:
log_login_failed(db, request, username)
db.commit()
return err_template("Mot de passe incorrect")
# Include mode in JWT token
token = create_access_token({"sub": row.username, "role": row.role, "uid": row.id, "mode": mode})
user = {"sub": row.username, "role": row.role, "uid": row.id, "mode": mode}
log_login(db, request, user)
db.commit()
# Redirect
perms = db.execute(text("SELECT module FROM user_permissions WHERE user_id = :uid"), {"uid": row.id}).fetchall()
modules = {r.module for r in perms}
redirect_url = "/quickwin" if modules == {"quickwin"} else "/dashboard"
response = RedirectResponse(url=redirect_url, status_code=303)
response.set_cookie(key="access_token", value=token, httponly=True, samesite="lax", max_age=3600)
return response
finally:
db.close()
@router.get("/logout")
async def logout(request: Request):
user = get_current_user(request)
if user:
mode = user.get("mode", "reel")
factory = SessionLocalDemo if mode == "demo" else SessionLocal
db = factory()
try:
log_logout(db, request, user)
db.commit()
finally:
db.close()
response = RedirectResponse(url="/login", status_code=302)
response.delete_cookie("access_token")
return response