- Users: 4 profils (admin/coordinator/operator/viewer) remplacent la matrix - /users/add: picker contacts iTop (plus de creation libre) - /me/change-password: flow force_password_change - LDAP: service + section settings + option login - Sync iTop contacts: filtre par teams (SecOps/iPOP/Externe/DSI/Admin DSI) - Auto-desactivation users si contact inactif - etat: alignement sur enum iTop (production/implementation/stock/obsolete) - Menu: Contacts dans Administration, Serveurs en groupe repliable - Audit bases: demo/prod via JWT mode Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
91 lines
2.6 KiB
Python
91 lines
2.6 KiB
Python
"""Dependances communes pour les routers"""
|
|
from fastapi import Request
|
|
from sqlalchemy import text
|
|
from .auth import decode_token
|
|
from .database import SessionLocal, SessionLocalDemo
|
|
|
|
|
|
def get_db(request: Request = None):
|
|
"""Retourne la session DB selon le mode (demo/reel) stocke dans le cookie JWT."""
|
|
demo = False
|
|
if request:
|
|
user = get_current_user(request)
|
|
if user and user.get("mode") == "demo":
|
|
demo = True
|
|
factory = SessionLocalDemo if demo else SessionLocal
|
|
db = factory()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def get_db_for_login(demo: bool = False):
|
|
"""Session DB pour le login (avant que le cookie existe)."""
|
|
factory = SessionLocalDemo if demo else SessionLocal
|
|
db = factory()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def get_current_user(request: Request):
|
|
"""Extrait l'utilisateur du cookie JWT"""
|
|
token = request.cookies.get("access_token")
|
|
if not token:
|
|
return None
|
|
return decode_token(token)
|
|
|
|
|
|
def get_user_perms(db, user):
|
|
"""Charge les permissions : profil (role) + overrides de user_permissions.
|
|
Retourne un dict {module: level} ex: {'servers': 'admin', 'campaigns': 'edit'}"""
|
|
if not user:
|
|
return {}
|
|
uid = user.get("uid")
|
|
if not uid:
|
|
return {}
|
|
# Récupère le rôle
|
|
row = db.execute(text("SELECT role FROM users WHERE id = :uid"), {"uid": uid}).fetchone()
|
|
if not row:
|
|
return {}
|
|
# Base : permissions du profil
|
|
from .services.profile_service import get_profile_perms
|
|
perms = get_profile_perms(row.role)
|
|
# Overrides éventuels depuis user_permissions (niveau plus élevé uniquement)
|
|
rank = {"view": 1, "edit": 2, "admin": 3}
|
|
overrides = db.execute(text(
|
|
"SELECT module, level FROM user_permissions WHERE user_id = :uid"
|
|
), {"uid": uid}).fetchall()
|
|
for r in overrides:
|
|
cur = perms.get(r.module)
|
|
if not cur or rank.get(r.level, 0) > rank.get(cur, 0):
|
|
perms[r.module] = r.level
|
|
return perms
|
|
|
|
|
|
def can_view(perms, module):
|
|
"""L'utilisateur peut-il voir ce module ?"""
|
|
return module in perms
|
|
|
|
|
|
def can_edit(perms, module):
|
|
"""L'utilisateur peut-il editer ce module ?"""
|
|
return perms.get(module) in ("edit", "admin")
|
|
|
|
|
|
def can_admin(perms, module):
|
|
"""L'utilisateur a-t-il les droits admin sur ce module ?"""
|
|
return perms.get(module) == "admin"
|
|
|
|
|
|
def base_context(request, db, user):
|
|
"""Context de base pour tous les templates (user + perms)"""
|
|
perms = get_user_perms(db, user)
|
|
return {
|
|
"request": request,
|
|
"user": user,
|
|
"perms": perms,
|
|
}
|