patchcenter/app/routers/users.py
Khalid MOUTAOUAKIL 53c393b49b Permissions DB, créneaux auto, assignations, audit Splunk, accents
- Permissions 100% depuis user_permissions (plus de hardcode)
- Middleware injecte perms dans chaque requête
- Créneaux auto: 09h-12h30 / 14h-16h45, pas 15min, hprod lun-mar, prod mer-jeu
- Assignations par défaut: par domaine, app_type, zone, serveur (table default_assignments)
- Auto-liaison app_group: même intervenant recette+prod
- Audit Splunk: /var/log/patchcenter_audit.json (JSON one-line par event)
- Login/logout/campagnes/prereqs loggés en base + fichier
- Page erreur maintenance (500/404) avec contact SecOps
- Accents français dans toute lUI
- Operator affiché comme Intervenant
- Session 1h, redirect / vers dashboard si connecté
- Demo mode prereqs (DEMO_MODE=True)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 15:25:43 +02:00

191 lines
8.0 KiB
Python

"""Router users — gestion utilisateurs + permissions par module"""
from fastapi import APIRouter, Request, Depends, Form
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, can_admin, base_context
from ..auth import hash_password
from ..config import APP_NAME
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
MODULES = ["servers", "campaigns", "qualys", "audit", "settings", "users", "planning", "specifics"]
LEVELS = ["view", "edit", "admin"]
def _get_users_with_perms(db):
users = db.execute(text(
"SELECT id, username, display_name, email, role, auth_type, is_active, last_login FROM users ORDER BY username"
)).fetchall()
result = []
for u in users:
perms = {}
rows = db.execute(text(
"SELECT module, level FROM user_permissions WHERE user_id = :uid"
), {"uid": u.id}).fetchall()
for r in rows:
perms[r.module] = r.level
result.append({"user": u, "perms": perms})
return result
def _check_access(request, db):
user = get_current_user(request)
if not user:
return None, None, RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "users"):
return None, None, RedirectResponse(url="/dashboard")
return user, perms, None
@router.get("/users", response_class=HTMLResponse)
async def users_page(request: Request, db=Depends(get_db)):
user, perms, redirect = _check_access(request, db)
if redirect:
return redirect
users_data = _get_users_with_perms(db)
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME, "users_data": users_data,
"modules": MODULES, "levels": LEVELS,
"can_edit_users": can_edit(perms, "users"),
"msg": request.query_params.get("msg"),
})
return templates.TemplateResponse("users.html", ctx)
@router.post("/users/add")
async def user_add(request: Request, db=Depends(get_db),
new_username: str = Form(...), new_display_name: str = Form(...),
new_email: str = Form(""), new_password: str = Form(...),
new_role: str = Form("operator")):
user, perms, redirect = _check_access(request, db)
if redirect:
return redirect
if not can_edit(perms, "users"):
return RedirectResponse(url="/users?msg=forbidden", status_code=303)
# Verifier si username existe deja
existing = db.execute(text("SELECT id, is_active FROM users WHERE LOWER(username) = LOWER(:u)"),
{"u": new_username.strip()}).fetchone()
if existing:
if not existing.is_active:
return RedirectResponse(url=f"/users?msg=exists_inactive", status_code=303)
return RedirectResponse(url=f"/users?msg=exists", status_code=303)
pw_hash = hash_password(new_password)
db.execute(text("""
INSERT INTO users (username, display_name, email, password_hash, role)
VALUES (:u, :dn, :e, :ph, :r)
"""), {"u": new_username.strip(), "dn": new_display_name, "e": new_email or None,
"ph": pw_hash, "r": new_role})
row = db.execute(text("SELECT id FROM users WHERE username = :u"), {"u": new_username.strip()}).fetchone()
if row:
default_perms = {
"admin": {m: "admin" for m in MODULES},
"coordinator": {"servers": "admin", "campaigns": "admin", "qualys": "admin", "audit": "admin",
"settings": "view", "users": "view", "planning": "admin", "specifics": "admin"},
"operator": {"servers": "admin", "campaigns": "view", "qualys": "admin", "audit": "admin",
"settings": "view", "planning": "view", "specifics": "admin"},
"viewer": {"servers": "view", "campaigns": "view", "audit": "view", "planning": "view"},
}
for mod, lvl in default_perms.get(new_role, {}).items():
db.execute(text(
"INSERT INTO user_permissions (user_id, module, level) VALUES (:uid, :m, :l) ON CONFLICT DO NOTHING"
), {"uid": row.id, "m": mod, "l": lvl})
db.commit()
return RedirectResponse(url="/users?msg=added", status_code=303)
@router.post("/users/{user_id}/permissions")
async def user_permissions_save(request: Request, user_id: int, db=Depends(get_db)):
user, perms, redirect = _check_access(request, db)
if redirect:
return redirect
if not can_edit(perms, "users"):
return RedirectResponse(url="/users?msg=forbidden", status_code=303)
form = await request.form()
db.execute(text("DELETE FROM user_permissions WHERE user_id = :uid"), {"uid": user_id})
for mod in MODULES:
lvl = form.get(f"perm_{mod}", "")
if lvl and lvl in LEVELS:
db.execute(text(
"INSERT INTO user_permissions (user_id, module, level) VALUES (:uid, :m, :l)"
), {"uid": user_id, "m": mod, "l": lvl})
db.commit()
return RedirectResponse(url=f"/users?msg=perms_saved", status_code=303)
@router.post("/users/{user_id}/edit")
async def user_edit(request: Request, user_id: int, db=Depends(get_db),
display_name: str = Form(""), email: str = Form(""),
role: str = Form("")):
user, perms, redirect = _check_access(request, db)
if redirect:
return redirect
if not can_edit(perms, "users"):
return RedirectResponse(url="/users?msg=forbidden", status_code=303)
updates = []
params = {"id": user_id}
if display_name:
updates.append("display_name = :dn"); params["dn"] = display_name
if email:
updates.append("email = :em"); params["em"] = email
if role:
updates.append("role = :r"); params["r"] = role
if updates:
db.execute(text(f"UPDATE users SET {', '.join(updates)} WHERE id = :id"), params)
db.commit()
return RedirectResponse(url="/users?msg=edited", status_code=303)
@router.post("/users/{user_id}/toggle")
async def user_toggle(request: Request, user_id: int, db=Depends(get_db)):
user, perms, redirect = _check_access(request, db)
if redirect:
return redirect
if not can_edit(perms, "users"):
return RedirectResponse(url="/users?msg=forbidden", status_code=303)
# Empecher de se desactiver soi-meme
if user_id == user.get("uid"):
return RedirectResponse(url="/users?msg=cant_self", status_code=303)
db.execute(text("UPDATE users SET is_active = NOT is_active WHERE id = :id"), {"id": user_id})
db.commit()
return RedirectResponse(url="/users?msg=toggled", status_code=303)
@router.post("/users/{user_id}/password")
async def user_password(request: Request, user_id: int, db=Depends(get_db),
new_password: str = Form(...)):
user, perms, redirect = _check_access(request, db)
if redirect:
return redirect
if not can_edit(perms, "users"):
return RedirectResponse(url="/users?msg=forbidden", status_code=303)
pw_hash = hash_password(new_password)
db.execute(text("UPDATE users SET password_hash = :ph WHERE id = :id"),
{"ph": pw_hash, "id": user_id})
db.commit()
return RedirectResponse(url="/users?msg=password_changed", status_code=303)
@router.post("/users/{user_id}/delete")
async def user_delete(request: Request, user_id: int, db=Depends(get_db)):
user, perms, redirect = _check_access(request, db)
if redirect:
return redirect
if not can_admin(perms, "users"):
return RedirectResponse(url="/users?msg=forbidden", status_code=303)
if user_id == user.get("uid"):
return RedirectResponse(url="/users?msg=cant_self", status_code=303)
db.execute(text("DELETE FROM user_permissions WHERE user_id = :uid"), {"uid": user_id})
db.execute(text("DELETE FROM users WHERE id = :id"), {"id": user_id})
db.commit()
return RedirectResponse(url="/users?msg=deleted", status_code=303)