patchcenter/app/routers/users.py
Admin MPCZ 8479d7280e Users/Contacts: workflow profils + LDAP + sync iTop + etat aligne
- Users: 4 profils (admin/coordinator/operator/viewer) remplacent la matrix
- /users/add: picker contacts iTop (plus de creation libre)
- /me/change-password: flow force_password_change
- LDAP: service + section settings + option login
- Sync iTop contacts: filtre par teams (SecOps/iPOP/Externe/DSI/Admin DSI)
- Auto-desactivation users si contact inactif
- etat: alignement sur enum iTop (production/implementation/stock/obsolete)
- Menu: Contacts dans Administration, Serveurs en groupe repliable
- Audit bases: demo/prod via JWT mode

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 18:50:43 +02:00

288 lines
12 KiB
Python

"""Router users — gestion utilisateurs par profil + picker de contacts iTop.
Seul l'admin peut gérer les utilisateurs.
Les utilisateurs sont créés depuis les contacts synchronisés d'iTop (sauf admin local).
"""
from fastapi import APIRouter, Request, Depends, Form
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, can_admin, base_context
from ..auth import hash_password
from ..services.profile_service import PROFILES, PROFILE_LABELS, PROFILE_DESCRIPTIONS
from ..config import APP_NAME
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
# Profils disponibles (keys du PROFILES dict)
ROLES = ["admin", "coordinator", "operator", "viewer"]
def _get_users(db):
users = db.execute(text("""
SELECT u.id, u.username, u.display_name, u.email, u.role, u.auth_type,
u.is_active, u.last_login, u.itop_person_id, u.force_password_change,
c.team as contact_team, c.function as contact_function
FROM users u
LEFT JOIN contacts c ON c.itop_id = u.itop_person_id
ORDER BY u.is_active DESC, u.username
""")).fetchall()
return users
def _check_access(request, db, require_edit=False):
user = get_current_user(request)
if not user:
return None, None, RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "users"):
return None, None, RedirectResponse(url="/dashboard")
if require_edit and not can_edit(perms, "users"):
return None, None, RedirectResponse(url="/users?msg=forbidden", status_code=303)
return user, perms, None
@router.get("/me/change-password", response_class=HTMLResponse)
async def me_change_password_page(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
return templates.TemplateResponse("change_password.html", {
"request": request, "app_name": APP_NAME, "user": user,
"error": None, "perms": {},
})
@router.post("/me/change-password")
async def me_change_password(request: Request, db=Depends(get_db),
current_password: str = Form(...),
new_password: str = Form(...),
confirm_password: str = Form(...)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
from ..auth import verify_password
row = db.execute(text("SELECT id, password_hash FROM users WHERE id=:uid"),
{"uid": user.get("uid")}).fetchone()
if not row:
return RedirectResponse(url="/logout", status_code=303)
err = None
if not verify_password(current_password, row.password_hash or ""):
err = "Mot de passe actuel incorrect"
elif new_password != confirm_password:
err = "Les deux mots de passe ne correspondent pas"
elif len(new_password) < 8:
err = "Le nouveau mot de passe doit faire au moins 8 caractères"
elif new_password == current_password:
err = "Le nouveau mot de passe doit être différent de l'actuel"
if err:
return templates.TemplateResponse("change_password.html", {
"request": request, "app_name": APP_NAME, "user": user,
"error": err, "perms": {},
})
pw_hash = hash_password(new_password)
db.execute(text("""UPDATE users SET password_hash=:ph, force_password_change=false,
updated_at=NOW() WHERE id=:uid"""),
{"ph": pw_hash, "uid": user.get("uid")})
db.commit()
return RedirectResponse(url="/dashboard?msg=password_changed", status_code=303)
@router.get("/users", response_class=HTMLResponse)
async def users_page(request: Request, db=Depends(get_db)):
user, perms, redirect = _check_access(request, db)
if redirect:
return redirect
users = _get_users(db)
# Compter les contacts disponibles pour ajout (pas encore users)
available_count = db.execute(text("""
SELECT COUNT(*) FROM contacts c
WHERE c.itop_id IS NOT NULL AND c.is_active = true
AND NOT EXISTS (SELECT 1 FROM users u WHERE u.itop_person_id = c.itop_id)
""")).scalar()
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME, "users": users,
"available_count": available_count,
"roles": ROLES, "profile_labels": PROFILE_LABELS,
"profile_descriptions": PROFILE_DESCRIPTIONS,
"can_edit_users": can_edit(perms, "users"),
"can_admin_users": can_admin(perms, "users"),
"msg": request.query_params.get("msg"),
})
return templates.TemplateResponse("users.html", ctx)
@router.get("/users/add", response_class=HTMLResponse)
async def users_add_page(request: Request, db=Depends(get_db),
search: str = "", team: str = ""):
user, perms, redirect = _check_access(request, db, require_edit=True)
if redirect:
return redirect
# Lister les contacts iTop non-encore-users
where = ["c.itop_id IS NOT NULL", "c.is_active = true",
"NOT EXISTS (SELECT 1 FROM users u WHERE u.itop_person_id = c.itop_id)"]
params = {}
if search:
where.append("(c.name ILIKE :s OR c.email ILIKE :s)")
params["s"] = f"%{search}%"
if team:
where.append("c.team = :t")
params["t"] = team
wc = " AND ".join(where)
contacts = db.execute(text(f"""
SELECT c.id, c.itop_id, c.name, c.email, c.telephone, c.team, c.function, c.role
FROM contacts c WHERE {wc}
ORDER BY c.team, c.name LIMIT 200
"""), params).fetchall()
# Liste des teams disponibles pour filtrer
teams = db.execute(text("""
SELECT DISTINCT team FROM contacts WHERE team IS NOT NULL AND team != ''
ORDER BY team
""")).fetchall()
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME, "contacts": contacts, "teams": teams,
"roles": ROLES, "profile_labels": PROFILE_LABELS,
"profile_descriptions": PROFILE_DESCRIPTIONS,
"search": search, "team_filter": team,
})
return templates.TemplateResponse("users_add.html", ctx)
@router.post("/users/add")
async def users_add(request: Request, db=Depends(get_db),
contact_id: str = Form(""), role: str = Form("viewer"),
username: str = Form(""), password: str = Form(""),
auth_type: str = Form("local"),
force_change: str = Form("")):
user, perms, redirect = _check_access(request, db, require_edit=True)
if redirect:
return redirect
if role not in ROLES:
return RedirectResponse(url="/users?msg=invalid_role", status_code=303)
# Récupérer le contact iTop
contact = None
if contact_id and contact_id.strip().isdigit():
contact = db.execute(text(
"SELECT id, itop_id, name, email FROM contacts WHERE id = :cid"
), {"cid": int(contact_id)}).fetchone()
if not contact:
return RedirectResponse(url="/users/add?msg=contact_required", status_code=303)
# Générer username si non fourni (email avant @)
if not username.strip():
username = contact.email.split("@")[0] if contact.email else contact.name.lower().replace(" ", ".")
# Verifier duplicats
existing = db.execute(text("SELECT id FROM users WHERE LOWER(username)=LOWER(:u) OR itop_person_id=:iid"),
{"u": username.strip(), "iid": contact.itop_id}).fetchone()
if existing:
return RedirectResponse(url="/users?msg=exists", status_code=303)
pw_hash = hash_password(password) if password else hash_password("ChangeMe!" + str(contact.itop_id))
fpc = True if force_change == "on" else (not password)
db.execute(text("""
INSERT INTO users (username, display_name, email, password_hash, role,
auth_type, itop_person_id, is_active, force_password_change)
VALUES (:u, :dn, :e, :ph, :r, :at, :iid, true, :fpc)
"""), {
"u": username.strip(), "dn": contact.name, "e": contact.email,
"ph": pw_hash, "r": role, "at": auth_type,
"iid": contact.itop_id, "fpc": fpc,
})
db.commit()
return RedirectResponse(url="/users?msg=added", status_code=303)
@router.post("/users/{user_id}/role")
async def user_change_role(request: Request, user_id: int, db=Depends(get_db),
role: str = Form(...)):
user, perms, redirect = _check_access(request, db, require_edit=True)
if redirect:
return redirect
if role not in ROLES:
return RedirectResponse(url="/users?msg=invalid_role", status_code=303)
# Empêche un admin de se rétrograder lui-même
if user_id == user.get("uid") and role != "admin":
return RedirectResponse(url="/users?msg=cant_demote_self", status_code=303)
db.execute(text("UPDATE users SET role=:r, updated_at=NOW() WHERE id=:id"),
{"r": role, "id": user_id})
db.commit()
return RedirectResponse(url="/users?msg=role_changed", status_code=303)
@router.post("/users/{user_id}/toggle")
async def user_toggle(request: Request, user_id: int, db=Depends(get_db)):
user, perms, redirect = _check_access(request, db, require_edit=True)
if redirect:
return redirect
if user_id == user.get("uid"):
return RedirectResponse(url="/users?msg=cant_self", status_code=303)
db.execute(text("UPDATE users SET is_active = NOT is_active WHERE id=:id"), {"id": user_id})
db.commit()
return RedirectResponse(url="/users?msg=toggled", status_code=303)
@router.post("/users/{user_id}/password")
async def user_password(request: Request, user_id: int, db=Depends(get_db),
new_password: str = Form(...),
force_change: str = Form("")):
user, perms, redirect = _check_access(request, db, require_edit=True)
if redirect:
return redirect
pw_hash = hash_password(new_password)
fpc = (force_change == "on")
db.execute(text("""UPDATE users SET password_hash=:ph, force_password_change=:fpc,
updated_at=NOW() WHERE id=:id"""),
{"ph": pw_hash, "fpc": fpc, "id": user_id})
db.commit()
return RedirectResponse(url="/users?msg=password_changed", status_code=303)
@router.post("/users/{user_id}/auth_type")
async def user_auth_type(request: Request, user_id: int, db=Depends(get_db),
auth_type: str = Form(...)):
user, perms, redirect = _check_access(request, db, require_edit=True)
if redirect:
return redirect
if auth_type not in ("local", "ldap"):
return RedirectResponse(url="/users?msg=invalid", status_code=303)
db.execute(text("UPDATE users SET auth_type=:at, updated_at=NOW() WHERE id=:id"),
{"at": auth_type, "id": user_id})
db.commit()
return RedirectResponse(url="/users?msg=auth_changed", status_code=303)
@router.post("/users/{user_id}/delete")
async def user_delete(request: Request, user_id: int, db=Depends(get_db)):
user, perms, redirect = _check_access(request, db)
if redirect:
return redirect
if not can_admin(perms, "users"):
return RedirectResponse(url="/users?msg=forbidden", status_code=303)
if user_id == user.get("uid"):
return RedirectResponse(url="/users?msg=cant_self", status_code=303)
# Protéger l'admin local (id=1)
row = db.execute(text("SELECT username, auth_type FROM users WHERE id=:id"), {"id": user_id}).fetchone()
if row and row.username == "admin" and row.auth_type == "local":
return RedirectResponse(url="/users?msg=cant_delete_admin", status_code=303)
db.execute(text("DELETE FROM user_permissions WHERE user_id=:uid"), {"uid": user_id})
db.execute(text("DELETE FROM users WHERE id=:id"), {"id": user_id})
db.commit()
return RedirectResponse(url="/users?msg=deleted", status_code=303)