- Users: 4 profils (admin/coordinator/operator/viewer) remplacent la matrix - /users/add: picker contacts iTop (plus de creation libre) - /me/change-password: flow force_password_change - LDAP: service + section settings + option login - Sync iTop contacts: filtre par teams (SecOps/iPOP/Externe/DSI/Admin DSI) - Auto-desactivation users si contact inactif - etat: alignement sur enum iTop (production/implementation/stock/obsolete) - Menu: Contacts dans Administration, Serveurs en groupe repliable - Audit bases: demo/prod via JWT mode Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
288 lines
12 KiB
Python
288 lines
12 KiB
Python
"""Router users — gestion utilisateurs par profil + picker de contacts iTop.
|
|
|
|
Seul l'admin peut gérer les utilisateurs.
|
|
Les utilisateurs sont créés depuis les contacts synchronisés d'iTop (sauf admin local).
|
|
"""
|
|
from fastapi import APIRouter, Request, Depends, Form
|
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from sqlalchemy import text
|
|
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, can_admin, base_context
|
|
from ..auth import hash_password
|
|
from ..services.profile_service import PROFILES, PROFILE_LABELS, PROFILE_DESCRIPTIONS
|
|
from ..config import APP_NAME
|
|
|
|
router = APIRouter()
|
|
templates = Jinja2Templates(directory="app/templates")
|
|
|
|
# Profils disponibles (keys du PROFILES dict)
|
|
ROLES = ["admin", "coordinator", "operator", "viewer"]
|
|
|
|
|
|
def _get_users(db):
|
|
users = db.execute(text("""
|
|
SELECT u.id, u.username, u.display_name, u.email, u.role, u.auth_type,
|
|
u.is_active, u.last_login, u.itop_person_id, u.force_password_change,
|
|
c.team as contact_team, c.function as contact_function
|
|
FROM users u
|
|
LEFT JOIN contacts c ON c.itop_id = u.itop_person_id
|
|
ORDER BY u.is_active DESC, u.username
|
|
""")).fetchall()
|
|
return users
|
|
|
|
|
|
def _check_access(request, db, require_edit=False):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return None, None, RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_view(perms, "users"):
|
|
return None, None, RedirectResponse(url="/dashboard")
|
|
if require_edit and not can_edit(perms, "users"):
|
|
return None, None, RedirectResponse(url="/users?msg=forbidden", status_code=303)
|
|
return user, perms, None
|
|
|
|
|
|
@router.get("/me/change-password", response_class=HTMLResponse)
|
|
async def me_change_password_page(request: Request, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
return templates.TemplateResponse("change_password.html", {
|
|
"request": request, "app_name": APP_NAME, "user": user,
|
|
"error": None, "perms": {},
|
|
})
|
|
|
|
|
|
@router.post("/me/change-password")
|
|
async def me_change_password(request: Request, db=Depends(get_db),
|
|
current_password: str = Form(...),
|
|
new_password: str = Form(...),
|
|
confirm_password: str = Form(...)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
|
|
from ..auth import verify_password
|
|
row = db.execute(text("SELECT id, password_hash FROM users WHERE id=:uid"),
|
|
{"uid": user.get("uid")}).fetchone()
|
|
if not row:
|
|
return RedirectResponse(url="/logout", status_code=303)
|
|
|
|
err = None
|
|
if not verify_password(current_password, row.password_hash or ""):
|
|
err = "Mot de passe actuel incorrect"
|
|
elif new_password != confirm_password:
|
|
err = "Les deux mots de passe ne correspondent pas"
|
|
elif len(new_password) < 8:
|
|
err = "Le nouveau mot de passe doit faire au moins 8 caractères"
|
|
elif new_password == current_password:
|
|
err = "Le nouveau mot de passe doit être différent de l'actuel"
|
|
|
|
if err:
|
|
return templates.TemplateResponse("change_password.html", {
|
|
"request": request, "app_name": APP_NAME, "user": user,
|
|
"error": err, "perms": {},
|
|
})
|
|
|
|
pw_hash = hash_password(new_password)
|
|
db.execute(text("""UPDATE users SET password_hash=:ph, force_password_change=false,
|
|
updated_at=NOW() WHERE id=:uid"""),
|
|
{"ph": pw_hash, "uid": user.get("uid")})
|
|
db.commit()
|
|
return RedirectResponse(url="/dashboard?msg=password_changed", status_code=303)
|
|
|
|
|
|
@router.get("/users", response_class=HTMLResponse)
|
|
async def users_page(request: Request, db=Depends(get_db)):
|
|
user, perms, redirect = _check_access(request, db)
|
|
if redirect:
|
|
return redirect
|
|
|
|
users = _get_users(db)
|
|
|
|
# Compter les contacts disponibles pour ajout (pas encore users)
|
|
available_count = db.execute(text("""
|
|
SELECT COUNT(*) FROM contacts c
|
|
WHERE c.itop_id IS NOT NULL AND c.is_active = true
|
|
AND NOT EXISTS (SELECT 1 FROM users u WHERE u.itop_person_id = c.itop_id)
|
|
""")).scalar()
|
|
|
|
ctx = base_context(request, db, user)
|
|
ctx.update({
|
|
"app_name": APP_NAME, "users": users,
|
|
"available_count": available_count,
|
|
"roles": ROLES, "profile_labels": PROFILE_LABELS,
|
|
"profile_descriptions": PROFILE_DESCRIPTIONS,
|
|
"can_edit_users": can_edit(perms, "users"),
|
|
"can_admin_users": can_admin(perms, "users"),
|
|
"msg": request.query_params.get("msg"),
|
|
})
|
|
return templates.TemplateResponse("users.html", ctx)
|
|
|
|
|
|
@router.get("/users/add", response_class=HTMLResponse)
|
|
async def users_add_page(request: Request, db=Depends(get_db),
|
|
search: str = "", team: str = ""):
|
|
user, perms, redirect = _check_access(request, db, require_edit=True)
|
|
if redirect:
|
|
return redirect
|
|
|
|
# Lister les contacts iTop non-encore-users
|
|
where = ["c.itop_id IS NOT NULL", "c.is_active = true",
|
|
"NOT EXISTS (SELECT 1 FROM users u WHERE u.itop_person_id = c.itop_id)"]
|
|
params = {}
|
|
if search:
|
|
where.append("(c.name ILIKE :s OR c.email ILIKE :s)")
|
|
params["s"] = f"%{search}%"
|
|
if team:
|
|
where.append("c.team = :t")
|
|
params["t"] = team
|
|
|
|
wc = " AND ".join(where)
|
|
contacts = db.execute(text(f"""
|
|
SELECT c.id, c.itop_id, c.name, c.email, c.telephone, c.team, c.function, c.role
|
|
FROM contacts c WHERE {wc}
|
|
ORDER BY c.team, c.name LIMIT 200
|
|
"""), params).fetchall()
|
|
|
|
# Liste des teams disponibles pour filtrer
|
|
teams = db.execute(text("""
|
|
SELECT DISTINCT team FROM contacts WHERE team IS NOT NULL AND team != ''
|
|
ORDER BY team
|
|
""")).fetchall()
|
|
|
|
ctx = base_context(request, db, user)
|
|
ctx.update({
|
|
"app_name": APP_NAME, "contacts": contacts, "teams": teams,
|
|
"roles": ROLES, "profile_labels": PROFILE_LABELS,
|
|
"profile_descriptions": PROFILE_DESCRIPTIONS,
|
|
"search": search, "team_filter": team,
|
|
})
|
|
return templates.TemplateResponse("users_add.html", ctx)
|
|
|
|
|
|
@router.post("/users/add")
|
|
async def users_add(request: Request, db=Depends(get_db),
|
|
contact_id: str = Form(""), role: str = Form("viewer"),
|
|
username: str = Form(""), password: str = Form(""),
|
|
auth_type: str = Form("local"),
|
|
force_change: str = Form("")):
|
|
user, perms, redirect = _check_access(request, db, require_edit=True)
|
|
if redirect:
|
|
return redirect
|
|
if role not in ROLES:
|
|
return RedirectResponse(url="/users?msg=invalid_role", status_code=303)
|
|
|
|
# Récupérer le contact iTop
|
|
contact = None
|
|
if contact_id and contact_id.strip().isdigit():
|
|
contact = db.execute(text(
|
|
"SELECT id, itop_id, name, email FROM contacts WHERE id = :cid"
|
|
), {"cid": int(contact_id)}).fetchone()
|
|
if not contact:
|
|
return RedirectResponse(url="/users/add?msg=contact_required", status_code=303)
|
|
|
|
# Générer username si non fourni (email avant @)
|
|
if not username.strip():
|
|
username = contact.email.split("@")[0] if contact.email else contact.name.lower().replace(" ", ".")
|
|
|
|
# Verifier duplicats
|
|
existing = db.execute(text("SELECT id FROM users WHERE LOWER(username)=LOWER(:u) OR itop_person_id=:iid"),
|
|
{"u": username.strip(), "iid": contact.itop_id}).fetchone()
|
|
if existing:
|
|
return RedirectResponse(url="/users?msg=exists", status_code=303)
|
|
|
|
pw_hash = hash_password(password) if password else hash_password("ChangeMe!" + str(contact.itop_id))
|
|
fpc = True if force_change == "on" else (not password)
|
|
|
|
db.execute(text("""
|
|
INSERT INTO users (username, display_name, email, password_hash, role,
|
|
auth_type, itop_person_id, is_active, force_password_change)
|
|
VALUES (:u, :dn, :e, :ph, :r, :at, :iid, true, :fpc)
|
|
"""), {
|
|
"u": username.strip(), "dn": contact.name, "e": contact.email,
|
|
"ph": pw_hash, "r": role, "at": auth_type,
|
|
"iid": contact.itop_id, "fpc": fpc,
|
|
})
|
|
db.commit()
|
|
return RedirectResponse(url="/users?msg=added", status_code=303)
|
|
|
|
|
|
@router.post("/users/{user_id}/role")
|
|
async def user_change_role(request: Request, user_id: int, db=Depends(get_db),
|
|
role: str = Form(...)):
|
|
user, perms, redirect = _check_access(request, db, require_edit=True)
|
|
if redirect:
|
|
return redirect
|
|
if role not in ROLES:
|
|
return RedirectResponse(url="/users?msg=invalid_role", status_code=303)
|
|
# Empêche un admin de se rétrograder lui-même
|
|
if user_id == user.get("uid") and role != "admin":
|
|
return RedirectResponse(url="/users?msg=cant_demote_self", status_code=303)
|
|
db.execute(text("UPDATE users SET role=:r, updated_at=NOW() WHERE id=:id"),
|
|
{"r": role, "id": user_id})
|
|
db.commit()
|
|
return RedirectResponse(url="/users?msg=role_changed", status_code=303)
|
|
|
|
|
|
@router.post("/users/{user_id}/toggle")
|
|
async def user_toggle(request: Request, user_id: int, db=Depends(get_db)):
|
|
user, perms, redirect = _check_access(request, db, require_edit=True)
|
|
if redirect:
|
|
return redirect
|
|
if user_id == user.get("uid"):
|
|
return RedirectResponse(url="/users?msg=cant_self", status_code=303)
|
|
db.execute(text("UPDATE users SET is_active = NOT is_active WHERE id=:id"), {"id": user_id})
|
|
db.commit()
|
|
return RedirectResponse(url="/users?msg=toggled", status_code=303)
|
|
|
|
|
|
@router.post("/users/{user_id}/password")
|
|
async def user_password(request: Request, user_id: int, db=Depends(get_db),
|
|
new_password: str = Form(...),
|
|
force_change: str = Form("")):
|
|
user, perms, redirect = _check_access(request, db, require_edit=True)
|
|
if redirect:
|
|
return redirect
|
|
pw_hash = hash_password(new_password)
|
|
fpc = (force_change == "on")
|
|
db.execute(text("""UPDATE users SET password_hash=:ph, force_password_change=:fpc,
|
|
updated_at=NOW() WHERE id=:id"""),
|
|
{"ph": pw_hash, "fpc": fpc, "id": user_id})
|
|
db.commit()
|
|
return RedirectResponse(url="/users?msg=password_changed", status_code=303)
|
|
|
|
|
|
@router.post("/users/{user_id}/auth_type")
|
|
async def user_auth_type(request: Request, user_id: int, db=Depends(get_db),
|
|
auth_type: str = Form(...)):
|
|
user, perms, redirect = _check_access(request, db, require_edit=True)
|
|
if redirect:
|
|
return redirect
|
|
if auth_type not in ("local", "ldap"):
|
|
return RedirectResponse(url="/users?msg=invalid", status_code=303)
|
|
db.execute(text("UPDATE users SET auth_type=:at, updated_at=NOW() WHERE id=:id"),
|
|
{"at": auth_type, "id": user_id})
|
|
db.commit()
|
|
return RedirectResponse(url="/users?msg=auth_changed", status_code=303)
|
|
|
|
|
|
@router.post("/users/{user_id}/delete")
|
|
async def user_delete(request: Request, user_id: int, db=Depends(get_db)):
|
|
user, perms, redirect = _check_access(request, db)
|
|
if redirect:
|
|
return redirect
|
|
if not can_admin(perms, "users"):
|
|
return RedirectResponse(url="/users?msg=forbidden", status_code=303)
|
|
if user_id == user.get("uid"):
|
|
return RedirectResponse(url="/users?msg=cant_self", status_code=303)
|
|
# Protéger l'admin local (id=1)
|
|
row = db.execute(text("SELECT username, auth_type FROM users WHERE id=:id"), {"id": user_id}).fetchone()
|
|
if row and row.username == "admin" and row.auth_type == "local":
|
|
return RedirectResponse(url="/users?msg=cant_delete_admin", status_code=303)
|
|
db.execute(text("DELETE FROM user_permissions WHERE user_id=:uid"), {"uid": user_id})
|
|
db.execute(text("DELETE FROM users WHERE id=:id"), {"id": user_id})
|
|
db.commit()
|
|
return RedirectResponse(url="/users?msg=deleted", status_code=303)
|