- Permissions 100% depuis user_permissions (plus de hardcode) - Middleware injecte perms dans chaque requête - Créneaux auto: 09h-12h30 / 14h-16h45, pas 15min, hprod lun-mar, prod mer-jeu - Assignations par défaut: par domaine, app_type, zone, serveur (table default_assignments) - Auto-liaison app_group: même intervenant recette+prod - Audit Splunk: /var/log/patchcenter_audit.json (JSON one-line par event) - Login/logout/campagnes/prereqs loggés en base + fichier - Page erreur maintenance (500/404) avec contact SecOps - Accents français dans toute lUI - Operator affiché comme Intervenant - Session 1h, redirect / vers dashboard si connecté - Demo mode prereqs (DEMO_MODE=True) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
191 lines
8.0 KiB
Python
191 lines
8.0 KiB
Python
"""Router users — gestion utilisateurs + permissions par module"""
|
|
from fastapi import APIRouter, Request, Depends, Form
|
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from sqlalchemy import text
|
|
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, can_admin, base_context
|
|
from ..auth import hash_password
|
|
from ..config import APP_NAME
|
|
|
|
router = APIRouter()
|
|
templates = Jinja2Templates(directory="app/templates")
|
|
|
|
MODULES = ["servers", "campaigns", "qualys", "audit", "settings", "users", "planning", "specifics"]
|
|
LEVELS = ["view", "edit", "admin"]
|
|
|
|
|
|
def _get_users_with_perms(db):
|
|
users = db.execute(text(
|
|
"SELECT id, username, display_name, email, role, auth_type, is_active, last_login FROM users ORDER BY username"
|
|
)).fetchall()
|
|
result = []
|
|
for u in users:
|
|
perms = {}
|
|
rows = db.execute(text(
|
|
"SELECT module, level FROM user_permissions WHERE user_id = :uid"
|
|
), {"uid": u.id}).fetchall()
|
|
for r in rows:
|
|
perms[r.module] = r.level
|
|
result.append({"user": u, "perms": perms})
|
|
return result
|
|
|
|
|
|
def _check_access(request, db):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return None, None, RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_view(perms, "users"):
|
|
return None, None, RedirectResponse(url="/dashboard")
|
|
return user, perms, None
|
|
|
|
|
|
@router.get("/users", response_class=HTMLResponse)
|
|
async def users_page(request: Request, db=Depends(get_db)):
|
|
user, perms, redirect = _check_access(request, db)
|
|
if redirect:
|
|
return redirect
|
|
users_data = _get_users_with_perms(db)
|
|
ctx = base_context(request, db, user)
|
|
ctx.update({
|
|
"app_name": APP_NAME, "users_data": users_data,
|
|
"modules": MODULES, "levels": LEVELS,
|
|
"can_edit_users": can_edit(perms, "users"),
|
|
"msg": request.query_params.get("msg"),
|
|
})
|
|
return templates.TemplateResponse("users.html", ctx)
|
|
|
|
|
|
@router.post("/users/add")
|
|
async def user_add(request: Request, db=Depends(get_db),
|
|
new_username: str = Form(...), new_display_name: str = Form(...),
|
|
new_email: str = Form(""), new_password: str = Form(...),
|
|
new_role: str = Form("operator")):
|
|
user, perms, redirect = _check_access(request, db)
|
|
if redirect:
|
|
return redirect
|
|
if not can_edit(perms, "users"):
|
|
return RedirectResponse(url="/users?msg=forbidden", status_code=303)
|
|
|
|
# Verifier si username existe deja
|
|
existing = db.execute(text("SELECT id, is_active FROM users WHERE LOWER(username) = LOWER(:u)"),
|
|
{"u": new_username.strip()}).fetchone()
|
|
if existing:
|
|
if not existing.is_active:
|
|
return RedirectResponse(url=f"/users?msg=exists_inactive", status_code=303)
|
|
return RedirectResponse(url=f"/users?msg=exists", status_code=303)
|
|
|
|
pw_hash = hash_password(new_password)
|
|
db.execute(text("""
|
|
INSERT INTO users (username, display_name, email, password_hash, role)
|
|
VALUES (:u, :dn, :e, :ph, :r)
|
|
"""), {"u": new_username.strip(), "dn": new_display_name, "e": new_email or None,
|
|
"ph": pw_hash, "r": new_role})
|
|
|
|
row = db.execute(text("SELECT id FROM users WHERE username = :u"), {"u": new_username.strip()}).fetchone()
|
|
if row:
|
|
default_perms = {
|
|
"admin": {m: "admin" for m in MODULES},
|
|
"coordinator": {"servers": "admin", "campaigns": "admin", "qualys": "admin", "audit": "admin",
|
|
"settings": "view", "users": "view", "planning": "admin", "specifics": "admin"},
|
|
"operator": {"servers": "admin", "campaigns": "view", "qualys": "admin", "audit": "admin",
|
|
"settings": "view", "planning": "view", "specifics": "admin"},
|
|
"viewer": {"servers": "view", "campaigns": "view", "audit": "view", "planning": "view"},
|
|
}
|
|
for mod, lvl in default_perms.get(new_role, {}).items():
|
|
db.execute(text(
|
|
"INSERT INTO user_permissions (user_id, module, level) VALUES (:uid, :m, :l) ON CONFLICT DO NOTHING"
|
|
), {"uid": row.id, "m": mod, "l": lvl})
|
|
|
|
db.commit()
|
|
return RedirectResponse(url="/users?msg=added", status_code=303)
|
|
|
|
|
|
@router.post("/users/{user_id}/permissions")
|
|
async def user_permissions_save(request: Request, user_id: int, db=Depends(get_db)):
|
|
user, perms, redirect = _check_access(request, db)
|
|
if redirect:
|
|
return redirect
|
|
if not can_edit(perms, "users"):
|
|
return RedirectResponse(url="/users?msg=forbidden", status_code=303)
|
|
|
|
form = await request.form()
|
|
db.execute(text("DELETE FROM user_permissions WHERE user_id = :uid"), {"uid": user_id})
|
|
for mod in MODULES:
|
|
lvl = form.get(f"perm_{mod}", "")
|
|
if lvl and lvl in LEVELS:
|
|
db.execute(text(
|
|
"INSERT INTO user_permissions (user_id, module, level) VALUES (:uid, :m, :l)"
|
|
), {"uid": user_id, "m": mod, "l": lvl})
|
|
db.commit()
|
|
return RedirectResponse(url=f"/users?msg=perms_saved", status_code=303)
|
|
|
|
|
|
@router.post("/users/{user_id}/edit")
|
|
async def user_edit(request: Request, user_id: int, db=Depends(get_db),
|
|
display_name: str = Form(""), email: str = Form(""),
|
|
role: str = Form("")):
|
|
user, perms, redirect = _check_access(request, db)
|
|
if redirect:
|
|
return redirect
|
|
if not can_edit(perms, "users"):
|
|
return RedirectResponse(url="/users?msg=forbidden", status_code=303)
|
|
|
|
updates = []
|
|
params = {"id": user_id}
|
|
if display_name:
|
|
updates.append("display_name = :dn"); params["dn"] = display_name
|
|
if email:
|
|
updates.append("email = :em"); params["em"] = email
|
|
if role:
|
|
updates.append("role = :r"); params["r"] = role
|
|
if updates:
|
|
db.execute(text(f"UPDATE users SET {', '.join(updates)} WHERE id = :id"), params)
|
|
db.commit()
|
|
return RedirectResponse(url="/users?msg=edited", status_code=303)
|
|
|
|
|
|
@router.post("/users/{user_id}/toggle")
|
|
async def user_toggle(request: Request, user_id: int, db=Depends(get_db)):
|
|
user, perms, redirect = _check_access(request, db)
|
|
if redirect:
|
|
return redirect
|
|
if not can_edit(perms, "users"):
|
|
return RedirectResponse(url="/users?msg=forbidden", status_code=303)
|
|
# Empecher de se desactiver soi-meme
|
|
if user_id == user.get("uid"):
|
|
return RedirectResponse(url="/users?msg=cant_self", status_code=303)
|
|
db.execute(text("UPDATE users SET is_active = NOT is_active WHERE id = :id"), {"id": user_id})
|
|
db.commit()
|
|
return RedirectResponse(url="/users?msg=toggled", status_code=303)
|
|
|
|
|
|
@router.post("/users/{user_id}/password")
|
|
async def user_password(request: Request, user_id: int, db=Depends(get_db),
|
|
new_password: str = Form(...)):
|
|
user, perms, redirect = _check_access(request, db)
|
|
if redirect:
|
|
return redirect
|
|
if not can_edit(perms, "users"):
|
|
return RedirectResponse(url="/users?msg=forbidden", status_code=303)
|
|
pw_hash = hash_password(new_password)
|
|
db.execute(text("UPDATE users SET password_hash = :ph WHERE id = :id"),
|
|
{"ph": pw_hash, "id": user_id})
|
|
db.commit()
|
|
return RedirectResponse(url="/users?msg=password_changed", status_code=303)
|
|
|
|
|
|
@router.post("/users/{user_id}/delete")
|
|
async def user_delete(request: Request, user_id: int, db=Depends(get_db)):
|
|
user, perms, redirect = _check_access(request, db)
|
|
if redirect:
|
|
return redirect
|
|
if not can_admin(perms, "users"):
|
|
return RedirectResponse(url="/users?msg=forbidden", status_code=303)
|
|
if user_id == user.get("uid"):
|
|
return RedirectResponse(url="/users?msg=cant_self", status_code=303)
|
|
db.execute(text("DELETE FROM user_permissions WHERE user_id = :uid"), {"uid": user_id})
|
|
db.execute(text("DELETE FROM users WHERE id = :id"), {"id": user_id})
|
|
db.commit()
|
|
return RedirectResponse(url="/users?msg=deleted", status_code=303)
|