"""Router users — gestion utilisateurs + permissions par module""" from fastapi import APIRouter, Request, Depends, Form from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from sqlalchemy import text from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, can_admin, base_context from ..auth import hash_password from ..config import APP_NAME router = APIRouter() templates = Jinja2Templates(directory="app/templates") MODULES = ["servers", "campaigns", "qualys", "audit", "settings", "users", "planning", "specifics"] LEVELS = ["view", "edit", "admin"] def _get_users_with_perms(db): users = db.execute(text( "SELECT id, username, display_name, email, role, auth_type, is_active, last_login FROM users ORDER BY username" )).fetchall() result = [] for u in users: perms = {} rows = db.execute(text( "SELECT module, level FROM user_permissions WHERE user_id = :uid" ), {"uid": u.id}).fetchall() for r in rows: perms[r.module] = r.level result.append({"user": u, "perms": perms}) return result def _check_access(request, db): user = get_current_user(request) if not user: return None, None, RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_view(perms, "users"): return None, None, RedirectResponse(url="/dashboard") return user, perms, None @router.get("/users", response_class=HTMLResponse) async def users_page(request: Request, db=Depends(get_db)): user, perms, redirect = _check_access(request, db) if redirect: return redirect users_data = _get_users_with_perms(db) ctx = base_context(request, db, user) ctx.update({ "app_name": APP_NAME, "users_data": users_data, "modules": MODULES, "levels": LEVELS, "can_edit_users": can_edit(perms, "users"), "msg": request.query_params.get("msg"), }) return templates.TemplateResponse("users.html", ctx) @router.post("/users/add") async def user_add(request: Request, db=Depends(get_db), new_username: str = Form(...), new_display_name: str = Form(...), new_email: str = Form(""), new_password: str = Form(...), new_role: str = Form("operator")): user, perms, redirect = _check_access(request, db) if redirect: return redirect if not can_edit(perms, "users"): return RedirectResponse(url="/users?msg=forbidden", status_code=303) # Verifier si username existe deja existing = db.execute(text("SELECT id, is_active FROM users WHERE LOWER(username) = LOWER(:u)"), {"u": new_username.strip()}).fetchone() if existing: if not existing.is_active: return RedirectResponse(url=f"/users?msg=exists_inactive", status_code=303) return RedirectResponse(url=f"/users?msg=exists", status_code=303) pw_hash = hash_password(new_password) db.execute(text(""" INSERT INTO users (username, display_name, email, password_hash, role) VALUES (:u, :dn, :e, :ph, :r) """), {"u": new_username.strip(), "dn": new_display_name, "e": new_email or None, "ph": pw_hash, "r": new_role}) row = db.execute(text("SELECT id FROM users WHERE username = :u"), {"u": new_username.strip()}).fetchone() if row: default_perms = { "admin": {m: "admin" for m in MODULES}, "coordinator": {"servers": "admin", "campaigns": "admin", "qualys": "admin", "audit": "admin", "settings": "view", "users": "view", "planning": "admin", "specifics": "admin"}, "operator": {"servers": "admin", "campaigns": "view", "qualys": "admin", "audit": "admin", "settings": "view", "planning": "view", "specifics": "admin"}, "viewer": {"servers": "view", "campaigns": "view", "audit": "view", "planning": "view"}, } for mod, lvl in default_perms.get(new_role, {}).items(): db.execute(text( "INSERT INTO user_permissions (user_id, module, level) VALUES (:uid, :m, :l) ON CONFLICT DO NOTHING" ), {"uid": row.id, "m": mod, "l": lvl}) db.commit() return RedirectResponse(url="/users?msg=added", status_code=303) @router.post("/users/{user_id}/permissions") async def user_permissions_save(request: Request, user_id: int, db=Depends(get_db)): user, perms, redirect = _check_access(request, db) if redirect: return redirect if not can_edit(perms, "users"): return RedirectResponse(url="/users?msg=forbidden", status_code=303) form = await request.form() db.execute(text("DELETE FROM user_permissions WHERE user_id = :uid"), {"uid": user_id}) for mod in MODULES: lvl = form.get(f"perm_{mod}", "") if lvl and lvl in LEVELS: db.execute(text( "INSERT INTO user_permissions (user_id, module, level) VALUES (:uid, :m, :l)" ), {"uid": user_id, "m": mod, "l": lvl}) db.commit() return RedirectResponse(url=f"/users?msg=perms_saved", status_code=303) @router.post("/users/{user_id}/edit") async def user_edit(request: Request, user_id: int, db=Depends(get_db), display_name: str = Form(""), email: str = Form(""), role: str = Form("")): user, perms, redirect = _check_access(request, db) if redirect: return redirect if not can_edit(perms, "users"): return RedirectResponse(url="/users?msg=forbidden", status_code=303) updates = [] params = {"id": user_id} if display_name: updates.append("display_name = :dn"); params["dn"] = display_name if email: updates.append("email = :em"); params["em"] = email if role: updates.append("role = :r"); params["r"] = role if updates: db.execute(text(f"UPDATE users SET {', '.join(updates)} WHERE id = :id"), params) db.commit() return RedirectResponse(url="/users?msg=edited", status_code=303) @router.post("/users/{user_id}/toggle") async def user_toggle(request: Request, user_id: int, db=Depends(get_db)): user, perms, redirect = _check_access(request, db) if redirect: return redirect if not can_edit(perms, "users"): return RedirectResponse(url="/users?msg=forbidden", status_code=303) # Empecher de se desactiver soi-meme if user_id == user.get("uid"): return RedirectResponse(url="/users?msg=cant_self", status_code=303) db.execute(text("UPDATE users SET is_active = NOT is_active WHERE id = :id"), {"id": user_id}) db.commit() return RedirectResponse(url="/users?msg=toggled", status_code=303) @router.post("/users/{user_id}/password") async def user_password(request: Request, user_id: int, db=Depends(get_db), new_password: str = Form(...)): user, perms, redirect = _check_access(request, db) if redirect: return redirect if not can_edit(perms, "users"): return RedirectResponse(url="/users?msg=forbidden", status_code=303) pw_hash = hash_password(new_password) db.execute(text("UPDATE users SET password_hash = :ph WHERE id = :id"), {"ph": pw_hash, "id": user_id}) db.commit() return RedirectResponse(url="/users?msg=password_changed", status_code=303) @router.post("/users/{user_id}/delete") async def user_delete(request: Request, user_id: int, db=Depends(get_db)): user, perms, redirect = _check_access(request, db) if redirect: return redirect if not can_admin(perms, "users"): return RedirectResponse(url="/users?msg=forbidden", status_code=303) if user_id == user.get("uid"): return RedirectResponse(url="/users?msg=cant_self", status_code=303) db.execute(text("DELETE FROM user_permissions WHERE user_id = :uid"), {"uid": user_id}) db.execute(text("DELETE FROM users WHERE id = :id"), {"id": user_id}) db.commit() return RedirectResponse(url="/users?msg=deleted", status_code=303)