patchcenter/app/routers/users.py
Khalid MOUTAOUAKIL 8277653c43 PatchCenter v2.0 — Initial commit
Modules: Dashboard, Serveurs, Campagnes, Planning, Specifiques, Settings, Users
Stack: FastAPI + Jinja2 + HTMX + Alpine.js + TailwindCSS + PostgreSQL
Features: Qualys sync, prereqs auto, planning annuel, server specifics, role-based access

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 03:00:12 +02:00

143 lines
5.7 KiB
Python

"""Router users — gestion utilisateurs + permissions par module"""
from fastapi import APIRouter, Request, Depends, Form
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
from ..dependencies import get_db, get_current_user
from ..auth import hash_password
from ..config import APP_NAME
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
MODULES = ["servers", "campaigns", "qualys", "audit", "settings", "users"]
LEVELS = ["view", "edit", "admin"]
def _get_users_with_perms(db):
users = db.execute(text(
"SELECT id, username, display_name, email, role, auth_type, is_active, last_login FROM users ORDER BY username"
)).fetchall()
result = []
for u in users:
perms = {}
rows = db.execute(text(
"SELECT module, level FROM user_permissions WHERE user_id = :uid"
), {"uid": u.id}).fetchall()
for r in rows:
perms[r.module] = r.level
result.append({"user": u, "perms": perms})
return result
@router.get("/users", response_class=HTMLResponse)
async def users_page(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
users_data = _get_users_with_perms(db)
return templates.TemplateResponse("users.html", {
"request": request, "user": user, "app_name": APP_NAME,
"users_data": users_data, "modules": MODULES, "levels": LEVELS,
"saved": None,
})
@router.post("/users/add", response_class=HTMLResponse)
async def user_add(request: Request, db=Depends(get_db),
new_username: str = Form(...), new_display_name: str = Form(...),
new_email: str = Form(""), new_password: str = Form(...),
new_role: str = Form("operator")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
pw_hash = hash_password(new_password)
db.execute(text("""
INSERT INTO users (username, display_name, email, password_hash, role)
VALUES (:u, :dn, :e, :ph, :r)
"""), {"u": new_username, "dn": new_display_name, "e": new_email or None,
"ph": pw_hash, "r": new_role})
# Recuperer l'id du nouveau user
row = db.execute(text("SELECT id FROM users WHERE username = :u"), {"u": new_username}).fetchone()
if row:
# Permissions par defaut selon role
default_perms = {
"admin": {m: "admin" for m in MODULES},
"coordinator": {"servers": "edit", "campaigns": "admin", "qualys": "edit", "audit": "view", "settings": "view", "users": "view"},
"operator": {"servers": "edit", "campaigns": "edit", "qualys": "view", "audit": "view"},
"viewer": {"servers": "view", "campaigns": "view", "qualys": "view", "audit": "view"},
}
for mod, lvl in default_perms.get(new_role, {}).items():
db.execute(text(
"INSERT INTO user_permissions (user_id, module, level) VALUES (:uid, :m, :l) ON CONFLICT DO NOTHING"
), {"uid": row.id, "m": mod, "l": lvl})
db.commit()
users_data = _get_users_with_perms(db)
return templates.TemplateResponse("users.html", {
"request": request, "user": user, "app_name": APP_NAME,
"users_data": users_data, "modules": MODULES, "levels": LEVELS,
"saved": "add",
})
@router.post("/users/{user_id}/permissions", response_class=HTMLResponse)
async def user_permissions_save(request: Request, user_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
form = await request.form()
# Supprimer les anciennes permissions
db.execute(text("DELETE FROM user_permissions WHERE user_id = :uid"), {"uid": user_id})
# Inserer les nouvelles
for mod in MODULES:
lvl = form.get(f"perm_{mod}", "")
if lvl and lvl in LEVELS:
db.execute(text(
"INSERT INTO user_permissions (user_id, module, level) VALUES (:uid, :m, :l)"
), {"uid": user_id, "m": mod, "l": lvl})
db.commit()
users_data = _get_users_with_perms(db)
return templates.TemplateResponse("users.html", {
"request": request, "user": user, "app_name": APP_NAME,
"users_data": users_data, "modules": MODULES, "levels": LEVELS,
"saved": f"perms_{user_id}",
})
@router.post("/users/{user_id}/toggle", response_class=HTMLResponse)
async def user_toggle(request: Request, user_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
db.execute(text("UPDATE users SET is_active = NOT is_active WHERE id = :id"), {"id": user_id})
db.commit()
users_data = _get_users_with_perms(db)
return templates.TemplateResponse("users.html", {
"request": request, "user": user, "app_name": APP_NAME,
"users_data": users_data, "modules": MODULES, "levels": LEVELS,
"saved": "toggle",
})
@router.post("/users/{user_id}/password", response_class=HTMLResponse)
async def user_password(request: Request, user_id: int, db=Depends(get_db),
new_password: str = Form(...)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
pw_hash = hash_password(new_password)
db.execute(text("UPDATE users SET password_hash = :ph WHERE id = :id"),
{"ph": pw_hash, "id": user_id})
db.commit()
users_data = _get_users_with_perms(db)
return templates.TemplateResponse("users.html", {
"request": request, "user": user, "app_name": APP_NAME,
"users_data": users_data, "modules": MODULES, "levels": LEVELS,
"saved": "password",
})