From 8479d7280e709693abada12c27c87a004f8487c5 Mon Sep 17 00:00:00 2001 From: Admin MPCZ Date: Sun, 12 Apr 2026 18:50:43 +0200 Subject: [PATCH] Users/Contacts: workflow profils + LDAP + sync iTop + etat aligne - Users: 4 profils (admin/coordinator/operator/viewer) remplacent la matrix - /users/add: picker contacts iTop (plus de creation libre) - /me/change-password: flow force_password_change - LDAP: service + section settings + option login - Sync iTop contacts: filtre par teams (SecOps/iPOP/Externe/DSI/Admin DSI) - Auto-desactivation users si contact inactif - etat: alignement sur enum iTop (production/implementation/stock/obsolete) - Menu: Contacts dans Administration, Serveurs en groupe repliable - Audit bases: demo/prod via JWT mode Co-Authored-By: Claude Opus 4.6 (1M context) --- app/dependencies.py | 19 +- app/main.py | 30 ++- app/routers/auth.py | 60 ++++-- app/routers/settings.py | 30 +++ app/routers/users.py | 313 +++++++++++++++++++---------- app/services/itop_service.py | 258 ++++++++++++++++++++---- app/services/ldap_service.py | 113 +++++++++++ app/services/profile_service.py | 58 ++++++ app/templates/base.html | 127 +++++++++--- app/templates/change_password.html | 54 +++++ app/templates/login.html | 15 ++ app/templates/settings.html | 74 +++++++ app/templates/users.html | 235 +++++++++++----------- app/templates/users_add.html | 122 +++++++++++ migrate_applications.sql | 15 ++ migrate_etat.sql | 7 + migrate_users.sql | 23 +++ replace_etat.py | 52 +++++ 18 files changed, 1291 insertions(+), 314 deletions(-) create mode 100644 app/services/ldap_service.py create mode 100644 app/services/profile_service.py create mode 100644 app/templates/change_password.html create mode 100644 app/templates/users_add.html create mode 100644 migrate_applications.sql create mode 100644 migrate_etat.sql create mode 100644 migrate_users.sql create mode 100644 replace_etat.py diff --git a/app/dependencies.py b/app/dependencies.py index 8b52545..588a59e 100644 --- a/app/dependencies.py +++ b/app/dependencies.py @@ -39,17 +39,30 @@ def get_current_user(request: Request): def get_user_perms(db, user): - """Charge les permissions depuis la base pour un user. + """Charge les permissions : profil (role) + overrides de user_permissions. Retourne un dict {module: level} ex: {'servers': 'admin', 'campaigns': 'edit'}""" if not user: return {} uid = user.get("uid") if not uid: return {} - rows = db.execute(text( + # Récupère le rôle + row = db.execute(text("SELECT role FROM users WHERE id = :uid"), {"uid": uid}).fetchone() + if not row: + return {} + # Base : permissions du profil + from .services.profile_service import get_profile_perms + perms = get_profile_perms(row.role) + # Overrides éventuels depuis user_permissions (niveau plus élevé uniquement) + rank = {"view": 1, "edit": 2, "admin": 3} + overrides = db.execute(text( "SELECT module, level FROM user_permissions WHERE user_id = :uid" ), {"uid": uid}).fetchall() - return {r.module: r.level for r in rows} + for r in overrides: + cur = perms.get(r.module) + if not cur or rank.get(r.level, 0) > rank.get(cur, 0): + perms[r.module] = r.level + return perms def can_view(perms, module): diff --git a/app/main.py b/app/main.py index 0853329..c437b1c 100644 --- a/app/main.py +++ b/app/main.py @@ -5,23 +5,42 @@ from fastapi.staticfiles import StaticFiles from starlette.middleware.base import BaseHTTPMiddleware from .config import APP_NAME, APP_VERSION from .dependencies import get_current_user, get_user_perms -from .database import SessionLocal -from .routers import auth, dashboard, servers, settings, users, campaigns, planning, specifics, audit, contacts, qualys, safe_patching, audit_full, quickwin, referentiel +from .database import SessionLocal, SessionLocalDemo +from .routers import auth, dashboard, servers, settings, users, campaigns, planning, specifics, audit, contacts, qualys, quickwin, referentiel, patching class PermissionsMiddleware(BaseHTTPMiddleware): - """Injecte user + perms dans request.state pour tous les templates""" + """Injecte user + perms dans request.state pour tous les templates. + Gère aussi la redirection si force_password_change est activé.""" async def dispatch(self, request: Request, call_next): user = get_current_user(request) perms = {} + must_change_pwd = False if user: - db = SessionLocal() + # Sélectionner la base selon le mode JWT (prod/demo) + factory = SessionLocalDemo if user.get("mode") == "demo" else SessionLocal + db = factory() try: perms = get_user_perms(db, user) + # Check force_password_change + from sqlalchemy import text + row = db.execute(text("SELECT force_password_change FROM users WHERE id=:uid"), + {"uid": user.get("uid")}).fetchone() + if row and row.force_password_change: + must_change_pwd = True finally: db.close() request.state.user = user request.state.perms = perms + request.state.must_change_pwd = must_change_pwd + + # Redirect vers change-password si forcé (sauf pour les routes de changement/logout/static) + if must_change_pwd and user: + allowed = ("/me/change-password", "/logout", "/static/") + if not any(request.url.path.startswith(p) for p in allowed): + from fastapi.responses import RedirectResponse + return RedirectResponse(url="/me/change-password", status_code=303) + response = await call_next(request) return response @@ -41,10 +60,9 @@ app.include_router(specifics.router) app.include_router(audit.router) app.include_router(contacts.router) app.include_router(qualys.router) -app.include_router(safe_patching.router) -app.include_router(audit_full.router) app.include_router(quickwin.router) app.include_router(referentiel.router) +app.include_router(patching.router) @app.get("/") diff --git a/app/routers/auth.py b/app/routers/auth.py index 3c8a290..32f3c10 100644 --- a/app/routers/auth.py +++ b/app/routers/auth.py @@ -6,6 +6,7 @@ from ..dependencies import get_current_user from ..database import SessionLocal, SessionLocalDemo from ..auth import verify_password, create_access_token, hash_password from ..services.audit_service import log_login, log_logout, log_login_failed +from ..services.ldap_service import is_enabled as ldap_enabled, authenticate as ldap_authenticate from ..config import APP_NAME, APP_VERSION router = APIRouter() @@ -13,41 +14,62 @@ templates = Jinja2Templates(directory="app/templates") @router.get("/login", response_class=HTMLResponse) async def login_page(request: Request): + db = SessionLocal() + try: + ldap_ok = ldap_enabled(db) + finally: + db.close() return templates.TemplateResponse("login.html", { - "request": request, "app_name": APP_NAME, "version": APP_VERSION, "error": None + "request": request, "app_name": APP_NAME, "version": APP_VERSION, + "error": None, "ldap_enabled": ldap_ok, }) @router.post("/login") async def login(request: Request, username: str = Form(...), password: str = Form(...), - mode: str = Form("reel")): + mode: str = Form("reel"), auth_method: str = Form("local")): # Select DB based on mode factory = SessionLocalDemo if mode == "demo" else SessionLocal db = factory() try: - row = db.execute(text("SELECT id, username, password_hash, role, is_active FROM users WHERE LOWER(username) = LOWER(:u)"), + row = db.execute(text("SELECT id, username, password_hash, role, is_active, auth_type FROM users WHERE LOWER(username) = LOWER(:u)"), {"u": username}).fetchone() + + ldap_is_on = ldap_enabled(db) + err_template = lambda msg: templates.TemplateResponse("login.html", { + "request": request, "app_name": APP_NAME, "version": APP_VERSION, + "error": msg, "ldap_enabled": ldap_is_on + }) + if not row: log_login_failed(db, request, username) db.commit() - return templates.TemplateResponse("login.html", { - "request": request, "app_name": APP_NAME, "version": APP_VERSION, "error": "Utilisateur inconnu" - }) + return err_template("Utilisateur inconnu") if not row.is_active: log_login_failed(db, request, username) db.commit() - return templates.TemplateResponse("login.html", { - "request": request, "app_name": APP_NAME, "version": APP_VERSION, "error": "Compte desactive" - }) - try: - ok = verify_password(password, row.password_hash) - except Exception: - ok = False - if not ok: - log_login_failed(db, request, username) - db.commit() - return templates.TemplateResponse("login.html", { - "request": request, "app_name": APP_NAME, "version": APP_VERSION, "error": "Mot de passe incorrect" - }) + return err_template("Compte desactive") + + # Choix de la methode d'auth + use_ldap = (auth_method == "ldap") or (row.auth_type == "ldap") + if use_ldap and not ldap_is_on: + return err_template("LDAP non active") + + if use_ldap: + result = ldap_authenticate(db, username, password) + if not result.get("ok"): + log_login_failed(db, request, username) + db.commit() + return err_template(result.get("msg") or "Authentification LDAP echouee") + ok = True + else: + try: + ok = verify_password(password, row.password_hash or "") + except Exception: + ok = False + if not ok: + log_login_failed(db, request, username) + db.commit() + return err_template("Mot de passe incorrect") # Include mode in JWT token token = create_access_token({"sub": row.username, "role": row.role, "uid": row.id, "mode": mode}) user = {"sub": row.username, "role": row.role, "uid": row.id, "mode": mode} diff --git a/app/routers/settings.py b/app/routers/settings.py index 481e8a8..92cecd7 100644 --- a/app/routers/settings.py +++ b/app/routers/settings.py @@ -75,6 +75,20 @@ SECTIONS = { ("teams_sp_client_secret", "App Client Secret", True), ("teams_sp_tenant_id", "Tenant ID", False), ], + "ldap": [ + ("ldap_enabled", "Activer LDAP/AD (true/false)", False), + ("ldap_server", "Serveur (ex: ldaps://ad.sanef.com:636)", False), + ("ldap_base_dn", "Base DN (ex: DC=sanef,DC=com)", False), + ("ldap_bind_dn", "Compte de bind (DN complet)", False), + ("ldap_bind_pwd", "Mot de passe compte de bind", True), + ("ldap_user_filter", "Filtre user (ex: (sAMAccountName={username}))", False), + ("ldap_email_attr", "Attribut email (ex: mail)", False), + ("ldap_name_attr", "Attribut nom affiché (ex: displayName)", False), + ("ldap_tls", "TLS (true/false)", False), + ], + "itop_contacts": [ + ("itop_contact_teams", "Teams iTop à synchroniser (séparées par ,)", False), + ], } @@ -102,6 +116,8 @@ SECTION_ACCESS = { "splunk": {"visible": ["admin", "coordinator"], "editable": ["admin", "coordinator"]}, "teams": {"visible": ["admin", "coordinator"], "editable": ["admin", "coordinator"]}, "itop": {"visible": ["admin"], "editable": ["admin"]}, + "itop_contacts": {"visible": ["admin"], "editable": ["admin"]}, + "ldap": {"visible": ["admin"], "editable": ["admin"]}, "security": {"visible": ["admin"], "editable": ["admin"]}, } @@ -173,6 +189,20 @@ async def settings_save(request: Request, section: str, db=Depends(get_db)): return templates.TemplateResponse("settings.html", ctx) +@router.post("/settings/ldap/test") +async def settings_ldap_test(request: Request, db=Depends(get_db)): + """Teste la connexion LDAP avec le compte de bind.""" + from fastapi.responses import JSONResponse + user = get_current_user(request) + if not user: + return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401) + perms = get_user_perms(db, user) + if not can_edit(perms, "settings"): + return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403) + from ..services.ldap_service import test_connection + return JSONResponse(test_connection(db)) + + # --- vCenter CRUD --- @router.post("/settings/vcenter/add", response_class=HTMLResponse) diff --git a/app/routers/users.py b/app/routers/users.py index f381ac9..4efca99 100644 --- a/app/routers/users.py +++ b/app/routers/users.py @@ -1,180 +1,273 @@ -"""Router users — gestion utilisateurs + permissions par module""" +"""Router users — gestion utilisateurs par profil + picker de contacts iTop. + +Seul l'admin peut gérer les utilisateurs. +Les utilisateurs sont créés depuis les contacts synchronisés d'iTop (sauf admin local). +""" from fastapi import APIRouter, Request, Depends, Form from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from sqlalchemy import text from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, can_admin, base_context from ..auth import hash_password +from ..services.profile_service import PROFILES, PROFILE_LABELS, PROFILE_DESCRIPTIONS from ..config import APP_NAME router = APIRouter() templates = Jinja2Templates(directory="app/templates") -MODULES = ["servers", "campaigns", "qualys", "audit", "settings", "users", "planning", "specifics"] -LEVELS = ["view", "edit", "admin"] +# Profils disponibles (keys du PROFILES dict) +ROLES = ["admin", "coordinator", "operator", "viewer"] -def _get_users_with_perms(db): - users = db.execute(text( - "SELECT id, username, display_name, email, role, auth_type, is_active, last_login FROM users ORDER BY username" - )).fetchall() - result = [] - for u in users: - perms = {} - rows = db.execute(text( - "SELECT module, level FROM user_permissions WHERE user_id = :uid" - ), {"uid": u.id}).fetchall() - for r in rows: - perms[r.module] = r.level - result.append({"user": u, "perms": perms}) - return result +def _get_users(db): + users = db.execute(text(""" + SELECT u.id, u.username, u.display_name, u.email, u.role, u.auth_type, + u.is_active, u.last_login, u.itop_person_id, u.force_password_change, + c.team as contact_team, c.function as contact_function + FROM users u + LEFT JOIN contacts c ON c.itop_id = u.itop_person_id + ORDER BY u.is_active DESC, u.username + """)).fetchall() + return users -def _check_access(request, db): +def _check_access(request, db, require_edit=False): user = get_current_user(request) if not user: return None, None, RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_view(perms, "users"): return None, None, RedirectResponse(url="/dashboard") + if require_edit and not can_edit(perms, "users"): + return None, None, RedirectResponse(url="/users?msg=forbidden", status_code=303) return user, perms, None +@router.get("/me/change-password", response_class=HTMLResponse) +async def me_change_password_page(request: Request, db=Depends(get_db)): + user = get_current_user(request) + if not user: + return RedirectResponse(url="/login") + return templates.TemplateResponse("change_password.html", { + "request": request, "app_name": APP_NAME, "user": user, + "error": None, "perms": {}, + }) + + +@router.post("/me/change-password") +async def me_change_password(request: Request, db=Depends(get_db), + current_password: str = Form(...), + new_password: str = Form(...), + confirm_password: str = Form(...)): + user = get_current_user(request) + if not user: + return RedirectResponse(url="/login") + + from ..auth import verify_password + row = db.execute(text("SELECT id, password_hash FROM users WHERE id=:uid"), + {"uid": user.get("uid")}).fetchone() + if not row: + return RedirectResponse(url="/logout", status_code=303) + + err = None + if not verify_password(current_password, row.password_hash or ""): + err = "Mot de passe actuel incorrect" + elif new_password != confirm_password: + err = "Les deux mots de passe ne correspondent pas" + elif len(new_password) < 8: + err = "Le nouveau mot de passe doit faire au moins 8 caractères" + elif new_password == current_password: + err = "Le nouveau mot de passe doit être différent de l'actuel" + + if err: + return templates.TemplateResponse("change_password.html", { + "request": request, "app_name": APP_NAME, "user": user, + "error": err, "perms": {}, + }) + + pw_hash = hash_password(new_password) + db.execute(text("""UPDATE users SET password_hash=:ph, force_password_change=false, + updated_at=NOW() WHERE id=:uid"""), + {"ph": pw_hash, "uid": user.get("uid")}) + db.commit() + return RedirectResponse(url="/dashboard?msg=password_changed", status_code=303) + + @router.get("/users", response_class=HTMLResponse) async def users_page(request: Request, db=Depends(get_db)): user, perms, redirect = _check_access(request, db) if redirect: return redirect - users_data = _get_users_with_perms(db) + + users = _get_users(db) + + # Compter les contacts disponibles pour ajout (pas encore users) + available_count = db.execute(text(""" + SELECT COUNT(*) FROM contacts c + WHERE c.itop_id IS NOT NULL AND c.is_active = true + AND NOT EXISTS (SELECT 1 FROM users u WHERE u.itop_person_id = c.itop_id) + """)).scalar() + ctx = base_context(request, db, user) ctx.update({ - "app_name": APP_NAME, "users_data": users_data, - "modules": MODULES, "levels": LEVELS, + "app_name": APP_NAME, "users": users, + "available_count": available_count, + "roles": ROLES, "profile_labels": PROFILE_LABELS, + "profile_descriptions": PROFILE_DESCRIPTIONS, "can_edit_users": can_edit(perms, "users"), + "can_admin_users": can_admin(perms, "users"), "msg": request.query_params.get("msg"), }) return templates.TemplateResponse("users.html", ctx) -@router.post("/users/add") -async def user_add(request: Request, db=Depends(get_db), - new_username: str = Form(...), new_display_name: str = Form(...), - new_email: str = Form(""), new_password: str = Form(...), - new_role: str = Form("operator")): - user, perms, redirect = _check_access(request, db) +@router.get("/users/add", response_class=HTMLResponse) +async def users_add_page(request: Request, db=Depends(get_db), + search: str = "", team: str = ""): + user, perms, redirect = _check_access(request, db, require_edit=True) if redirect: return redirect - if not can_edit(perms, "users"): - return RedirectResponse(url="/users?msg=forbidden", status_code=303) - # Verifier si username existe deja - existing = db.execute(text("SELECT id, is_active FROM users WHERE LOWER(username) = LOWER(:u)"), - {"u": new_username.strip()}).fetchone() + # Lister les contacts iTop non-encore-users + where = ["c.itop_id IS NOT NULL", "c.is_active = true", + "NOT EXISTS (SELECT 1 FROM users u WHERE u.itop_person_id = c.itop_id)"] + params = {} + if search: + where.append("(c.name ILIKE :s OR c.email ILIKE :s)") + params["s"] = f"%{search}%" + if team: + where.append("c.team = :t") + params["t"] = team + + wc = " AND ".join(where) + contacts = db.execute(text(f""" + SELECT c.id, c.itop_id, c.name, c.email, c.telephone, c.team, c.function, c.role + FROM contacts c WHERE {wc} + ORDER BY c.team, c.name LIMIT 200 + """), params).fetchall() + + # Liste des teams disponibles pour filtrer + teams = db.execute(text(""" + SELECT DISTINCT team FROM contacts WHERE team IS NOT NULL AND team != '' + ORDER BY team + """)).fetchall() + + ctx = base_context(request, db, user) + ctx.update({ + "app_name": APP_NAME, "contacts": contacts, "teams": teams, + "roles": ROLES, "profile_labels": PROFILE_LABELS, + "profile_descriptions": PROFILE_DESCRIPTIONS, + "search": search, "team_filter": team, + }) + return templates.TemplateResponse("users_add.html", ctx) + + +@router.post("/users/add") +async def users_add(request: Request, db=Depends(get_db), + contact_id: str = Form(""), role: str = Form("viewer"), + username: str = Form(""), password: str = Form(""), + auth_type: str = Form("local"), + force_change: str = Form("")): + user, perms, redirect = _check_access(request, db, require_edit=True) + if redirect: + return redirect + if role not in ROLES: + return RedirectResponse(url="/users?msg=invalid_role", status_code=303) + + # Récupérer le contact iTop + contact = None + if contact_id and contact_id.strip().isdigit(): + contact = db.execute(text( + "SELECT id, itop_id, name, email FROM contacts WHERE id = :cid" + ), {"cid": int(contact_id)}).fetchone() + if not contact: + return RedirectResponse(url="/users/add?msg=contact_required", status_code=303) + + # Générer username si non fourni (email avant @) + if not username.strip(): + username = contact.email.split("@")[0] if contact.email else contact.name.lower().replace(" ", ".") + + # Verifier duplicats + existing = db.execute(text("SELECT id FROM users WHERE LOWER(username)=LOWER(:u) OR itop_person_id=:iid"), + {"u": username.strip(), "iid": contact.itop_id}).fetchone() if existing: - if not existing.is_active: - return RedirectResponse(url=f"/users?msg=exists_inactive", status_code=303) - return RedirectResponse(url=f"/users?msg=exists", status_code=303) + return RedirectResponse(url="/users?msg=exists", status_code=303) + + pw_hash = hash_password(password) if password else hash_password("ChangeMe!" + str(contact.itop_id)) + fpc = True if force_change == "on" else (not password) - pw_hash = hash_password(new_password) db.execute(text(""" - INSERT INTO users (username, display_name, email, password_hash, role) - VALUES (:u, :dn, :e, :ph, :r) - """), {"u": new_username.strip(), "dn": new_display_name, "e": new_email or None, - "ph": pw_hash, "r": new_role}) - - row = db.execute(text("SELECT id FROM users WHERE username = :u"), {"u": new_username.strip()}).fetchone() - if row: - default_perms = { - "admin": {m: "admin" for m in MODULES}, - "coordinator": {"servers": "admin", "campaigns": "admin", "qualys": "admin", "audit": "admin", - "settings": "view", "users": "view", "planning": "admin", "specifics": "admin"}, - "operator": {"servers": "admin", "campaigns": "view", "qualys": "admin", "audit": "admin", - "settings": "view", "planning": "view", "specifics": "admin"}, - "viewer": {"servers": "view", "campaigns": "view", "audit": "view", "planning": "view"}, - } - for mod, lvl in default_perms.get(new_role, {}).items(): - db.execute(text( - "INSERT INTO user_permissions (user_id, module, level) VALUES (:uid, :m, :l) ON CONFLICT DO NOTHING" - ), {"uid": row.id, "m": mod, "l": lvl}) - + INSERT INTO users (username, display_name, email, password_hash, role, + auth_type, itop_person_id, is_active, force_password_change) + VALUES (:u, :dn, :e, :ph, :r, :at, :iid, true, :fpc) + """), { + "u": username.strip(), "dn": contact.name, "e": contact.email, + "ph": pw_hash, "r": role, "at": auth_type, + "iid": contact.itop_id, "fpc": fpc, + }) db.commit() return RedirectResponse(url="/users?msg=added", status_code=303) -@router.post("/users/{user_id}/permissions") -async def user_permissions_save(request: Request, user_id: int, db=Depends(get_db)): - user, perms, redirect = _check_access(request, db) +@router.post("/users/{user_id}/role") +async def user_change_role(request: Request, user_id: int, db=Depends(get_db), + role: str = Form(...)): + user, perms, redirect = _check_access(request, db, require_edit=True) if redirect: return redirect - if not can_edit(perms, "users"): - return RedirectResponse(url="/users?msg=forbidden", status_code=303) - - form = await request.form() - db.execute(text("DELETE FROM user_permissions WHERE user_id = :uid"), {"uid": user_id}) - for mod in MODULES: - lvl = form.get(f"perm_{mod}", "") - if lvl and lvl in LEVELS: - db.execute(text( - "INSERT INTO user_permissions (user_id, module, level) VALUES (:uid, :m, :l)" - ), {"uid": user_id, "m": mod, "l": lvl}) + if role not in ROLES: + return RedirectResponse(url="/users?msg=invalid_role", status_code=303) + # Empêche un admin de se rétrograder lui-même + if user_id == user.get("uid") and role != "admin": + return RedirectResponse(url="/users?msg=cant_demote_self", status_code=303) + db.execute(text("UPDATE users SET role=:r, updated_at=NOW() WHERE id=:id"), + {"r": role, "id": user_id}) db.commit() - return RedirectResponse(url=f"/users?msg=perms_saved", status_code=303) - - -@router.post("/users/{user_id}/edit") -async def user_edit(request: Request, user_id: int, db=Depends(get_db), - display_name: str = Form(""), email: str = Form(""), - role: str = Form("")): - user, perms, redirect = _check_access(request, db) - if redirect: - return redirect - if not can_edit(perms, "users"): - return RedirectResponse(url="/users?msg=forbidden", status_code=303) - - updates = [] - params = {"id": user_id} - if display_name: - updates.append("display_name = :dn"); params["dn"] = display_name - if email: - updates.append("email = :em"); params["em"] = email - if role: - updates.append("role = :r"); params["r"] = role - if updates: - db.execute(text(f"UPDATE users SET {', '.join(updates)} WHERE id = :id"), params) - db.commit() - return RedirectResponse(url="/users?msg=edited", status_code=303) + return RedirectResponse(url="/users?msg=role_changed", status_code=303) @router.post("/users/{user_id}/toggle") async def user_toggle(request: Request, user_id: int, db=Depends(get_db)): - user, perms, redirect = _check_access(request, db) + user, perms, redirect = _check_access(request, db, require_edit=True) if redirect: return redirect - if not can_edit(perms, "users"): - return RedirectResponse(url="/users?msg=forbidden", status_code=303) - # Empecher de se desactiver soi-meme if user_id == user.get("uid"): return RedirectResponse(url="/users?msg=cant_self", status_code=303) - db.execute(text("UPDATE users SET is_active = NOT is_active WHERE id = :id"), {"id": user_id}) + db.execute(text("UPDATE users SET is_active = NOT is_active WHERE id=:id"), {"id": user_id}) db.commit() return RedirectResponse(url="/users?msg=toggled", status_code=303) @router.post("/users/{user_id}/password") async def user_password(request: Request, user_id: int, db=Depends(get_db), - new_password: str = Form(...)): - user, perms, redirect = _check_access(request, db) + new_password: str = Form(...), + force_change: str = Form("")): + user, perms, redirect = _check_access(request, db, require_edit=True) if redirect: return redirect - if not can_edit(perms, "users"): - return RedirectResponse(url="/users?msg=forbidden", status_code=303) pw_hash = hash_password(new_password) - db.execute(text("UPDATE users SET password_hash = :ph WHERE id = :id"), - {"ph": pw_hash, "id": user_id}) + fpc = (force_change == "on") + db.execute(text("""UPDATE users SET password_hash=:ph, force_password_change=:fpc, + updated_at=NOW() WHERE id=:id"""), + {"ph": pw_hash, "fpc": fpc, "id": user_id}) db.commit() return RedirectResponse(url="/users?msg=password_changed", status_code=303) +@router.post("/users/{user_id}/auth_type") +async def user_auth_type(request: Request, user_id: int, db=Depends(get_db), + auth_type: str = Form(...)): + user, perms, redirect = _check_access(request, db, require_edit=True) + if redirect: + return redirect + if auth_type not in ("local", "ldap"): + return RedirectResponse(url="/users?msg=invalid", status_code=303) + db.execute(text("UPDATE users SET auth_type=:at, updated_at=NOW() WHERE id=:id"), + {"at": auth_type, "id": user_id}) + db.commit() + return RedirectResponse(url="/users?msg=auth_changed", status_code=303) + + @router.post("/users/{user_id}/delete") async def user_delete(request: Request, user_id: int, db=Depends(get_db)): user, perms, redirect = _check_access(request, db) @@ -184,7 +277,11 @@ async def user_delete(request: Request, user_id: int, db=Depends(get_db)): return RedirectResponse(url="/users?msg=forbidden", status_code=303) if user_id == user.get("uid"): return RedirectResponse(url="/users?msg=cant_self", status_code=303) - db.execute(text("DELETE FROM user_permissions WHERE user_id = :uid"), {"uid": user_id}) - db.execute(text("DELETE FROM users WHERE id = :id"), {"id": user_id}) + # Protéger l'admin local (id=1) + row = db.execute(text("SELECT username, auth_type FROM users WHERE id=:id"), {"id": user_id}).fetchone() + if row and row.username == "admin" and row.auth_type == "local": + return RedirectResponse(url="/users?msg=cant_delete_admin", status_code=303) + db.execute(text("DELETE FROM user_permissions WHERE user_id=:uid"), {"uid": user_id}) + db.execute(text("DELETE FROM users WHERE id=:id"), {"id": user_id}) db.commit() return RedirectResponse(url="/users?msg=deleted", status_code=303) diff --git a/app/services/itop_service.py b/app/services/itop_service.py index 2c3cdef..c83a03d 100644 --- a/app/services/itop_service.py +++ b/app/services/itop_service.py @@ -43,19 +43,67 @@ class ITopClient: return self._call("core/create", **{"class": cls, "fields": fields, "comment": "PatchCenter sync"}) +def _normalize_os_for_itop(os_string): + """Normalise PRETTY_NAME vers un nom propre pour iTop OSVersion. + Ex: 'Debian GNU/Linux 12 (bookworm)' → 'Debian 12 (Bookworm)' + 'CentOS Stream 9' → 'CentOS Stream 9' + 'Red Hat Enterprise Linux 9.4 (Plow)' → 'RHEL 9.4 (Plow)' + """ + import re + s = os_string.strip() + # Debian GNU/Linux X (codename) → Debian X (Codename) + m = re.match(r'Debian GNU/Linux (\d+)\s*\((\w+)\)', s, re.I) + if m: + return f"Debian {m.group(1)} ({m.group(2).capitalize()})" + # Ubuntu X.Y LTS + m = re.match(r'Ubuntu (\d+\.\d+(?:\.\d+)?)\s*(LTS)?', s, re.I) + if m: + lts = " LTS" if m.group(2) else "" + return f"Ubuntu {m.group(1)}{lts}" + # Red Hat Enterprise Linux [Server] [release] X → RHEL X + m = re.match(r'Red Hat Enterprise Linux\s*(?:Server)?\s*(?:release)?\s*(\d+[\.\d]*)\s*\(?([\w]*)\)?', s, re.I) + if m: + codename = f" ({m.group(2).capitalize()})" if m.group(2) else "" + return f"RHEL {m.group(1)}{codename}" + # CentOS Stream release X → CentOS Stream X + m = re.match(r'CentOS Stream\s+release\s+(\d+[\.\d]*)', s, re.I) + if m: + return f"CentOS Stream {m.group(1)}" + # CentOS Linux X.Y → CentOS X.Y + m = re.match(r'CentOS\s+Linux\s+(\d+[\.\d]*)', s, re.I) + if m: + return f"CentOS {m.group(1)}" + # Rocky Linux X.Y (codename) → Rocky Linux X.Y + m = re.match(r'Rocky\s+Linux\s+(\d+[\.\d]*)', s, re.I) + if m: + return f"Rocky Linux {m.group(1)}" + # Oracle Linux / Oracle Enterprise Linux + m = re.match(r'Oracle\s+(?:Enterprise\s+)?Linux\s+(\d+[\.\d]*)', s, re.I) + if m: + return f"Oracle Linux {m.group(1)}" + # Fallback: remove "release" word + return re.sub(r'\s+release\s+', ' ', s).strip() + + def _upsert_ip(db, server_id, ip): if not ip: return - existing = db.execute(text( + # Remove all old itop-managed primary IPs for this server (keep only the current one) + db.execute(text( + "DELETE FROM server_ips WHERE server_id=:sid AND ip_type='primary' AND description='itop' AND ip_address != :ip"), + {"sid": server_id, "ip": ip}) + # Check if this exact IP already exists + exact = db.execute(text( "SELECT id FROM server_ips WHERE server_id=:sid AND ip_address=:ip"), {"sid": server_id, "ip": ip}).fetchone() - if not existing: - try: - db.execute(text( - "INSERT INTO server_ips (server_id, ip_address, ip_type, is_ssh, description) VALUES (:sid, :ip, 'primary', true, 'itop')"), - {"sid": server_id, "ip": ip}) - except Exception: - pass + if exact: + return + try: + db.execute(text( + "INSERT INTO server_ips (server_id, ip_address, ip_type, is_ssh, description) VALUES (:sid, :ip, 'primary', true, 'itop')"), + {"sid": server_id, "ip": ip}) + except Exception: + pass def _save_sync_timestamp(db, direction, stats): @@ -154,11 +202,20 @@ def sync_from_itop(db, itop_url, itop_user, itop_pass): except Exception: db.rollback() - # ─── 5. Contacts + Teams ─── - persons = client.get_all("Person", "name,first_name,email,phone,org_name") + # ─── 5. Contacts + Teams (filtre périmètre IT uniquement) ─── + persons = client.get_all("Person", "name,first_name,email,phone,org_name,function,status") + + # Périmètre IT : teams à synchroniser (configurable via settings) + from .secrets_service import get_secret as _gs + it_teams_raw = _gs(db, "itop_contact_teams") + it_teams_list = ["SecOps", "iPOP", "Externe", "DSI", "Admin DSI"] + if it_teams_raw: + it_teams_list = [t.strip() for t in it_teams_raw.split(",") if t.strip()] + + # Map person → (itop_id, team) + person_info = {} # fullname_lower -> {itop_id, team} + team_members = {} # pour responsables domaine×env plus bas - # Get team memberships to determine role - team_members = {} # person_fullname_lower -> team_name teams = client.get_all("Team", "name,persons_list") for t in teams: team_name = t.get("name", "") @@ -167,29 +224,67 @@ def sync_from_itop(db, itop_url, itop_user, itop_pass): if pname: team_members[pname] = team_name - team_role_map = {"SecOps": "referent_technique", "iPOP": "responsable_applicatif", "Externe": "referent_technique"} + team_role_map = {"SecOps": "referent_technique", "iPOP": "responsable_applicatif", + "Externe": "referent_technique", "DSI": "referent_technique", + "Admin DSI": "referent_technique"} + + # Set des itop_id et emails vus dans le périmètre IT (pour désactiver les autres) + seen_itop_ids = set() + seen_emails = set() + stats["contacts_deactivated"] = 0 for p in persons: fullname = f"{p.get('first_name','')} {p.get('name','')}".strip() email = p.get("email", "") if not email: continue - # Determine role from team team = team_members.get(fullname.lower(), "") + # Filtre : ne synchroniser que les persons dans le périmètre IT + if team not in it_teams_list: + continue role = team_role_map.get(team, "referent_technique") + itop_id = p.get("itop_id") + phone = p.get("phone", "") + function = p.get("function", "") + # Status iTop : active/inactive + itop_status = (p.get("status", "") or "").lower() + is_active = itop_status != "inactive" + + if itop_id: + seen_itop_ids.add(int(itop_id)) + seen_emails.add(email.lower()) existing = db.execute(text("SELECT id FROM contacts WHERE LOWER(email)=LOWER(:e)"), {"e": email}).fetchone() if existing: - db.execute(text("UPDATE contacts SET name=:n, role=:r, updated_at=NOW() WHERE id=:id"), - {"id": existing.id, "n": fullname, "r": role}) + db.execute(text("""UPDATE contacts SET name=:n, role=:r, itop_id=:iid, + telephone=:tel, team=:t, function=:f, is_active=:a, updated_at=NOW() WHERE id=:id"""), + {"id": existing.id, "n": fullname, "r": role, "iid": itop_id, + "tel": phone, "t": team, "f": function, "a": is_active}) else: try: - db.execute(text("INSERT INTO contacts (name, email, role) VALUES (:n, :e, :r)"), - {"n": fullname, "e": email, "r": role}) + db.execute(text("""INSERT INTO contacts (name, email, role, itop_id, + telephone, team, function, is_active) VALUES (:n, :e, :r, :iid, :tel, :t, :f, :a)"""), + {"n": fullname, "e": email, "r": role, "iid": itop_id, + "tel": phone, "t": team, "f": function, "a": is_active}) stats["contacts"] += 1 except Exception: db.rollback() + # Désactiver les contacts iTop qui ne sont plus dans le périmètre (plus dans les teams IT) + # Critère : a un itop_id mais n'a pas été vu dans le sync + if seen_itop_ids: + placeholders = ",".join(str(i) for i in seen_itop_ids) + r = db.execute(text(f"""UPDATE contacts SET is_active=false, updated_at=NOW() + WHERE itop_id IS NOT NULL AND itop_id NOT IN ({placeholders}) AND is_active=true""")) + stats["contacts_deactivated"] = r.rowcount + + # Désactiver les users PatchCenter liés à des contacts devenus inactifs + stats["users_deactivated"] = 0 + r = db.execute(text("""UPDATE users SET is_active=false, updated_at=NOW() + WHERE is_active=true AND itop_person_id IN + (SELECT itop_id FROM contacts WHERE is_active=false AND itop_id IS NOT NULL)""")) + stats["users_deactivated"] = r.rowcount + # ─── 6. Build lookup maps ─── domain_map = {r.name.lower(): r.id for r in db.execute(text("SELECT id, name FROM domains")).fetchall()} env_map = {r.name.lower(): r.id for r in db.execute(text("SELECT id, name FROM environments")).fetchall()} @@ -206,6 +301,30 @@ def sync_from_itop(db, itop_url, itop_user, itop_pass): de_responsables = defaultdict(lambda: {"resp_dom": defaultdict(int), "resp_dom_email": {}, "referent": defaultdict(int), "referent_email": {}}) + # ─── 7bis. ApplicationSolutions ─── + crit_map = {"high": "haute", "critical": "critique", "medium": "standard", "low": "basse"} + itop_apps = client.get_all("ApplicationSolution", "name,description,business_criticity,status") + for app in itop_apps: + iid = app.get("itop_id") + name = (app.get("name") or "")[:50] + full = (app.get("name") or "")[:200] + desc = (app.get("description") or "")[:500] + crit = crit_map.get((app.get("business_criticity") or "").lower(), "basse") + st = (app.get("status") or "active")[:30] + try: + db.execute(text("""INSERT INTO applications (itop_id, nom_court, nom_complet, description, criticite, status) + VALUES (:iid, :n, :nc, :d, :c, :s) + ON CONFLICT (itop_id) DO UPDATE SET nom_court=EXCLUDED.nom_court, + nom_complet=EXCLUDED.nom_complet, description=EXCLUDED.description, + criticite=EXCLUDED.criticite, status=EXCLUDED.status, updated_at=NOW()"""), + {"iid": iid, "n": name, "nc": full, "d": desc, "c": crit, "s": st}) + stats["applications"] = stats.get("applications", 0) + 1 + except Exception: + db.rollback() + db.commit() + app_by_itop_id = {r.itop_id: r.id for r in db.execute(text( + "SELECT id, itop_id FROM applications WHERE itop_id IS NOT NULL")).fetchall()} + # ─── 8. VirtualMachines ─── vms = client.get_all("VirtualMachine", "name,description,status,managementip,osfamily_id_friendlyname," @@ -215,10 +334,12 @@ def sync_from_itop(db, itop_url, itop_user, itop_pass): "contacts_list,virtualhost_name,business_criticity," "tier_name,connexion_method_name,ssh_user_name," "patch_frequency_name,pref_patch_jour_name,patch_window," - "patch_excludes,domain_ldap_name,last_patch_date") + "patch_excludes,domain_ldap_name,last_patch_date," + "applicationsolution_list") - itop_status = {"production": "en_production", "stock": "stock", - "implementation": "en_cours", "obsolete": "decommissionne"} + # PatchCenter etat = iTop status (meme enum: production, implementation, stock, obsolete) + itop_status = {"production": "production", "stock": "stock", + "implementation": "implementation", "obsolete": "obsolete"} for v in vms: hostname = v.get("name", "").split(".")[0].lower() @@ -264,12 +385,25 @@ def sync_from_itop(db, itop_url, itop_user, itop_pass): resp_srv_name = v.get("responsable_serveur_name", "") resp_dom_name = v.get("responsable_domaine_name", "") + # ApplicationSolution (première app si plusieurs) + app_id = None + app_name = None + apps_list = v.get("applicationsolution_list") or [] + if apps_list: + first = apps_list[0] + try: + itop_aid = int(first.get("applicationsolution_id", 0)) + app_id = app_by_itop_id.get(itop_aid) + app_name = first.get("applicationsolution_name", "") + except (ValueError, TypeError): + pass + vals = { "hostname": hostname, "fqdn": v.get("name", hostname), "os_family": "linux" if "linux" in v.get("osfamily_id_friendlyname", "").lower() else "windows", "os_version": v.get("osversion_id_friendlyname", ""), "machine_type": "vm", - "etat": itop_status.get(v.get("status", ""), "en_production"), + "etat": itop_status.get(v.get("status", ""), "production"), "de_id": de_id, "zone_id": zone_id, "resp_srv": resp_srv_name, "resp_srv_email": person_email.get(resp_srv_name.lower(), ""), @@ -282,6 +416,7 @@ def sync_from_itop(db, itop_url, itop_user, itop_pass): "patch_freq": patch_freq, "patch_excludes": v.get("patch_excludes", ""), "domain_ltd": v.get("domain_ldap_name", ""), "pref_jour": pref_jour, "pref_heure": pref_heure, + "app_id": app_id, "app_name": app_name, } existing = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname)=LOWER(:h)"), {"h": hostname}).fetchone() @@ -293,6 +428,7 @@ def sync_from_itop(db, itop_url, itop_user, itop_pass): tier=:tier, ssh_method=:ssh_method, ssh_user=:ssh_user, patch_frequency=:patch_freq, patch_excludes=:patch_excludes, domain_ltd=:domain_ltd, pref_patch_jour=:pref_jour, pref_patch_heure=:pref_heure, + application_id=:app_id, application_name=:app_name, updated_at=NOW() WHERE id=:sid"""), {**vals, "sid": existing.id}) if vals["ip"]: _upsert_ip(db, existing.id, vals["ip"]) @@ -341,22 +477,33 @@ def sync_from_itop(db, itop_url, itop_user, itop_pass): hostname = s.get("name", "").split(".")[0].lower() if not hostname: continue + contacts = s.get("contacts_list", []) + resp = contacts[0].get("contact_id_friendlyname", "") if contacts else "" + ip = s.get("managementip", "") + osf = "linux" if "linux" in s.get("osfamily_id_friendlyname", "").lower() else "windows" + osv = s.get("osversion_id_friendlyname", "") + existing = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname)=LOWER(:h)"), {"h": hostname}).fetchone() - if not existing: + if existing: + db.execute(text("""UPDATE servers SET fqdn=:f, os_family=:osf, os_version=:osv, + responsable_nom=:resp, commentaire=:desc, site=:site, updated_at=NOW() + WHERE id=:sid"""), + {"f": s.get("name", hostname), "osf": osf, "osv": osv, + "resp": resp, "desc": s.get("description", ""), + "site": s.get("location_name", ""), "sid": existing.id}) + if ip: + _upsert_ip(db, existing.id, ip) + stats["servers_updated"] += 1 + else: try: - contacts = s.get("contacts_list", []) - resp = contacts[0].get("contact_id_friendlyname", "") if contacts else "" db.execute(text("""INSERT INTO servers (hostname, fqdn, os_family, os_version, machine_type, etat, responsable_nom, commentaire, site, ssh_port, ssh_user, ssh_method, tier) - VALUES (:h, :f, :osf, :osv, 'physical', 'en_production', :resp, :desc, :site, + VALUES (:h, :f, :osf, :osv, 'physical', 'production', :resp, :desc, :site, 22, 'root', 'ssh_key', 'tier0')"""), - {"h": hostname, "f": s.get("name", hostname), - "osf": "linux" if "linux" in s.get("osfamily_id_friendlyname", "").lower() else "windows", - "osv": s.get("osversion_id_friendlyname", ""), + {"h": hostname, "f": s.get("name", hostname), "osf": osf, "osv": osv, "resp": resp, "desc": s.get("description", ""), "site": s.get("location_name", "")}) db.flush() - ip = s.get("managementip", "") if ip: new_srv = db.execute(text("SELECT id FROM servers WHERE hostname=:h"), {"h": hostname}).fetchone() if new_srv: @@ -414,26 +561,42 @@ def sync_to_itop(db, itop_url, itop_user, itop_pass): for v in client.get_all("VirtualMachine", "name"): itop_vms[v["name"].split(".")[0].lower()] = v - status_map = {"en_production": "production", "decommissionne": "obsolete", - "stock": "stock", "en_cours": "implementation"} + status_map = {"production": "production", "implementation": "implementation", + "stock": "stock", "obsolete": "obsolete"} tier_map = {"tier0": "Tier 0", "tier1": "Tier 1", "tier2": "Tier 2", "tier3": "Tier 3"} + # Build OSVersion cache: name.lower() → itop_id + itop_osversions = {} + for ov in client.get_all("OSVersion", "name,osfamily_name"): + itop_osversions[ov["name"].lower()] = ov + + # Build OSFamily cache + itop_osfamilies = {} + for of in client.get_all("OSFamily", "name"): + itop_osfamilies[of["name"].lower()] = of["itop_id"] + # Build Person name → itop_id lookup for responsable sync itop_persons = {} for p in client.get_all("Person", "name,first_name"): fullname = f"{p.get('first_name','')} {p.get('name','')}".strip() itop_persons[fullname.lower()] = p["itop_id"] - rows = db.execute(text("""SELECT hostname, fqdn, os_version, etat, commentaire, tier, - ssh_method, ssh_user, patch_frequency, patch_excludes, domain_ltd, - pref_patch_jour, pref_patch_heure, responsable_nom, referent_nom - FROM servers WHERE machine_type='vm'""")).fetchall() + rows = db.execute(text("""SELECT s.hostname, s.fqdn, s.os_version, s.os_family, s.etat, s.commentaire, s.tier, + s.ssh_method, s.ssh_user, s.patch_frequency, s.patch_excludes, s.domain_ltd, + s.pref_patch_jour, s.pref_patch_heure, s.responsable_nom, s.referent_nom, + s.application_id, + (SELECT si.ip_address::text FROM server_ips si WHERE si.server_id = s.id AND si.ip_type = 'primary' LIMIT 1) as mgmt_ip, + (SELECT sa.audit_date FROM server_audit sa WHERE sa.server_id = s.id ORDER BY sa.audit_date DESC LIMIT 1) as last_audit_date, + (SELECT a.itop_id FROM applications a WHERE a.id = s.application_id) as app_itop_id + FROM servers s WHERE s.machine_type='vm'""")).fetchall() for srv in rows: hostname = (srv.hostname or "").lower() itop_vm = itop_vms.get(hostname) fields = {} + if srv.mgmt_ip: + fields["managementip"] = srv.mgmt_ip.split("/")[0] if srv.etat: fields["status"] = status_map.get(srv.etat, "production") if srv.commentaire: @@ -455,6 +618,29 @@ def sync_to_itop(db, itop_url, itop_user, itop_pass): fields["patch_window"] = srv.pref_patch_heure if srv.domain_ltd: fields["domain_ldap_id"] = f"SELECT DomainLdap WHERE name = '{srv.domain_ltd}'" + # Date dernier audit + if srv.last_audit_date: + fields["last_patch_date"] = str(srv.last_audit_date)[:10] + # ApplicationSolution : pousser si défini (replace la liste) + if srv.app_itop_id: + fields["applicationsolution_list"] = [{"applicationsolution_id": int(srv.app_itop_id)}] + # OS version — chercher/créer dans iTop + if srv.os_version: + osv_name = _normalize_os_for_itop(srv.os_version) + osf_name = "Linux" if srv.os_family == "linux" else "Windows" if srv.os_family == "windows" else "Linux" + match = itop_osversions.get(osv_name.lower()) + if match: + fields["osversion_id"] = match["itop_id"] + else: + # Créer l'OSVersion dans iTop + osf_id = itop_osfamilies.get(osf_name.lower()) + if osf_id: + cr = client.create("OSVersion", {"name": osv_name, "osfamily_id": osf_id}) + if cr.get("code") == 0 and cr.get("objects"): + new_id = list(cr["objects"].values())[0]["key"] + fields["osversion_id"] = new_id + itop_osversions[osv_name.lower()] = {"itop_id": new_id, "name": osv_name} + stats["ref_created"] += 1 # Responsable serveur if srv.responsable_nom: pid = itop_persons.get(srv.responsable_nom.lower()) diff --git a/app/services/ldap_service.py b/app/services/ldap_service.py new file mode 100644 index 0000000..09ceec0 --- /dev/null +++ b/app/services/ldap_service.py @@ -0,0 +1,113 @@ +"""Service LDAP/AD — authentification via annuaire. + +Configuration via settings (clés) : + - ldap_enabled : "true" / "false" + - ldap_server : URL du serveur (ex: ldaps://ad.sanef.com:636) + - ldap_base_dn : DN de base (ex: DC=sanef,DC=com) + - ldap_bind_dn : DN du compte de bind (ex: CN=svc_pc,OU=SVC,DC=sanef,DC=com) + - ldap_bind_pwd : mot de passe (stocké chiffré via secrets_service) + - ldap_user_filter : filtre utilisateur (ex: (sAMAccountName={username})) + - ldap_tls : "true" / "false" +""" +import logging +from sqlalchemy import text + +log = logging.getLogger(__name__) + +try: + from ldap3 import Server, Connection, ALL, NTLM, SIMPLE, Tls + import ssl + LDAP_OK = True +except ImportError: + LDAP_OK = False + + +def _get_config(db): + """Retourne la config LDAP depuis app_secrets (via get_secret).""" + from .secrets_service import get_secret + + def s(k, default=""): + return get_secret(db, k) or default + + return { + "enabled": s("ldap_enabled", "false").lower() == "true", + "server": s("ldap_server"), + "base_dn": s("ldap_base_dn"), + "bind_dn": s("ldap_bind_dn"), + "bind_pwd": s("ldap_bind_pwd"), + "user_filter": s("ldap_user_filter", "(sAMAccountName={username})"), + "tls": s("ldap_tls", "true").lower() == "true", + "email_attr": s("ldap_email_attr", "mail"), + "name_attr": s("ldap_name_attr", "displayName"), + } + + +def is_enabled(db): + """LDAP activé et configuré ?""" + cfg = _get_config(db) + return cfg["enabled"] and cfg["server"] and cfg["base_dn"] + + +def authenticate(db, username, password): + """Tente l'authentification LDAP. + Retourne dict {ok, email, name, dn} ou {ok: False, msg: '...'}""" + if not LDAP_OK: + return {"ok": False, "msg": "Module ldap3 non installé"} + + cfg = _get_config(db) + if not cfg["enabled"]: + return {"ok": False, "msg": "LDAP désactivé"} + if not cfg["server"] or not cfg["base_dn"]: + return {"ok": False, "msg": "LDAP non configuré"} + + # 1. Bind service account pour chercher le DN de l'utilisateur + try: + server = Server(cfg["server"], get_info=ALL, use_ssl=cfg["server"].startswith("ldaps://")) + conn = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"], auto_bind=True) + except Exception as e: + log.error(f"LDAP bind failed: {e}") + return {"ok": False, "msg": f"Connexion LDAP échouée: {e}"} + + # 2. Recherche de l'utilisateur + user_filter = cfg["user_filter"].replace("{username}", username) + try: + conn.search(cfg["base_dn"], user_filter, + attributes=[cfg["email_attr"], cfg["name_attr"], "distinguishedName"]) + except Exception as e: + conn.unbind() + return {"ok": False, "msg": f"Recherche LDAP échouée: {e}"} + + if not conn.entries: + conn.unbind() + return {"ok": False, "msg": "Utilisateur introuvable dans LDAP"} + + entry = conn.entries[0] + user_dn = str(entry.distinguishedName) if hasattr(entry, "distinguishedName") else entry.entry_dn + email = str(getattr(entry, cfg["email_attr"], "")) or "" + name = str(getattr(entry, cfg["name_attr"], "")) or username + conn.unbind() + + # 3. Bind avec les credentials fournis + try: + user_conn = Connection(server, user=user_dn, password=password, auto_bind=True) + user_conn.unbind() + except Exception as e: + return {"ok": False, "msg": "Mot de passe incorrect"} + + return {"ok": True, "dn": user_dn, "email": email, "name": name} + + +def test_connection(db): + """Test la connexion LDAP avec le compte de bind (pour admin).""" + if not LDAP_OK: + return {"ok": False, "msg": "Module ldap3 non installé"} + cfg = _get_config(db) + if not cfg["server"]: + return {"ok": False, "msg": "Serveur non configuré"} + try: + server = Server(cfg["server"], get_info=ALL, use_ssl=cfg["server"].startswith("ldaps://")) + conn = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"], auto_bind=True) + conn.unbind() + return {"ok": True, "msg": "Connexion réussie"} + except Exception as e: + return {"ok": False, "msg": str(e)[:200]} diff --git a/app/services/profile_service.py b/app/services/profile_service.py new file mode 100644 index 0000000..59c4788 --- /dev/null +++ b/app/services/profile_service.py @@ -0,0 +1,58 @@ +"""Profils utilisateurs PatchCenter — mapping role → permissions pré-définies. + +4 profils : + - admin : tout (view/edit/admin sur tous les modules) + - coordinator : SecOps + coordination (Patcheur + gestion campagnes/planning) + - operator : Patcheur (intervenant SecOps — exécution patching) + - viewer : Invité (view-only : dashboard, servers, qualys, audit) +""" + +# Matrix profil → {module: level} +# level: "view" | "edit" | "admin" +PROFILES = { + "admin": { + "dashboard": "admin", "servers": "admin", "campaigns": "admin", + "planning": "admin", "specifics": "admin", "audit": "admin", + "contacts": "admin", "qualys": "admin", "quickwin": "admin", + "users": "admin", "settings": "admin", "referentiel": "admin", + }, + # Coordinateur = SecOps + gestion campagnes/planning + "coordinator": { + "dashboard": "view", "servers": "edit", "campaigns": "admin", + "planning": "edit", "specifics": "edit", "audit": "edit", + "contacts": "view", "qualys": "edit", "quickwin": "admin", + "users": "view", "referentiel": "view", + }, + # Patcheur = intervenant SecOps + "operator": { + "dashboard": "view", "servers": "view", "campaigns": "view", + "planning": "view", "audit": "edit", "qualys": "view", + "quickwin": "edit", "contacts": "view", + }, + # Invité = view-only (pas d'accès à l'audit) + "viewer": { + "dashboard": "view", "servers": "view", "qualys": "view", + "contacts": "view", "planning": "view", "quickwin": "view", + }, +} + + +def get_profile_perms(role: str) -> dict: + """Retourne les permissions pour un profil donné.""" + return dict(PROFILES.get(role, {})) + + +PROFILE_LABELS = { + "admin": "Admin", + "coordinator": "Coordinateur", + "operator": "Patcheur", + "viewer": "Invité", +} + + +PROFILE_DESCRIPTIONS = { + "admin": "Accès complet : gestion des utilisateurs, paramètres, tous les modules en admin", + "coordinator": "SecOps + coordination : gestion des campagnes, planning, exécution patching", + "operator": "Patcheur (intervenant SecOps) : exécution du patching, audit des serveurs", + "viewer": "Invité : consultation en lecture seule (dashboard, serveurs, Qualys, audit)", +} diff --git a/app/templates/base.html b/app/templates/base.html index 7bdde1e..8aa27b5 100644 --- a/app/templates/base.html +++ b/app/templates/base.html @@ -36,6 +36,7 @@ .inline-edit:focus { background: #0a0e17; border-color: #00d4ff; } .toast { position: fixed; bottom: 20px; right: 20px; padding: 12px 24px; border-radius: 8px; z-index: 1000; animation: fadeIn 0.3s; } @keyframes fadeIn { from { opacity: 0; transform: translateY(10px); } to { opacity: 1; transform: translateY(0); } } + [x-cloak] { display: none !important; } @@ -49,31 +50,111 @@

PatchCenter

v2.0 — SecOps

-