import logging logger = logging.getLogger(__name__) """Router settings — configuration modules externes + connexions""" from fastapi import APIRouter, Request, Depends, Form from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.templating import Jinja2Templates from sqlalchemy import text from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit from ..services.secrets_service import get_secret, set_secret, list_secrets, init_secrets_from_config from ..config import APP_NAME router = APIRouter() templates = Jinja2Templates(directory="app/templates") SECTIONS = { "qualys": [ ("qualys_url", "URL API", False), ("qualys_user", "Utilisateur", False), ("qualys_pass", "Mot de passe", True), ("qualys_proxy", "Proxy", False), ("qualys_bypass_proxy", "Bypass proxy (true/false)", False), ], "itop": [ ("itop_url", "URL API", False), ("itop_user", "Utilisateur", False), ("itop_pass", "Mot de passe", True), ], "ssh_key": [ ("ssh_key_default_user", "User SSH par defaut", False), ("ssh_key_default_port", "Port par defaut", False), ("ssh_key_private_key", "Cle privee (PEM)", True), ], "ssh_pwd": [ ("ssh_pwd_default_user", "User par defaut", False), ("ssh_pwd_default_pass", "Password par defaut", True), ], "ssh_psmp": [ ("psmp_host", "Adresse PSMP", False), ("psmp_port", "Port PSMP", False), ("psmp_user_format", "Format user", False), ("psmp_cyberark_user", "Compte CyberArk", False), ("psmp_target_user", "Utilisateur cible", False), ("psmp_default_safe", "Safe par defaut", False), ], "rdp_psm": [ ("rdp_psm_pvwa_url", "URL PVWA", False), ("rdp_psm_pvwa_user", "User PVWA", False), ("rdp_psm_pvwa_pass", "Password PVWA", True), ("rdp_psm_component", "Connection Component", False), ], "rdp_pwd": [ ("rdp_pwd_default_user", "User par defaut", False), ("rdp_pwd_default_pass", "Password par defaut", True), ("rdp_pwd_default_port", "Port RDP", False), ], "vsphere": [ ("vsphere_user", "Utilisateur vCenter", False), ("vsphere_pass", "Mot de passe vCenter", True), ], "security": [ ("security_session_timeout", "Timeout session (minutes)", False), ("security_max_login_attempts", "Max tentatives login", False), ], "splunk": [ ("splunk_hec_url", "URL HEC", False), ("splunk_hec_token", "Token HEC", True), ("splunk_index", "Index", False), ("splunk_sourcetype", "Sourcetype", False), ("splunk_verify_ssl", "Verifier SSL (true/false)", False), ], "teams": [ ("sharepoint_notif_path", "Chemin local OneDrive sync (ex: C:\\Users\\xxx\\sanefgroupe...\\notifications)", False), ("teams_webhook_url", "Webhook URL globale (legacy, futur OAuth)", False), ("teams_sp_site_url", "SharePoint Site URL (futur webhook)", False), ("teams_sp_library", "SharePoint Library (futur webhook)", False), ("teams_sp_folder", "SharePoint Folder (futur webhook)", False), ("teams_sp_client_id", "App Client ID (futur webhook OAuth)", False), ("teams_sp_client_secret", "App Client Secret (futur webhook OAuth)", True), ("teams_sp_tenant_id", "Tenant ID (futur webhook OAuth)", False), ], "ldap": [ ("ldap_enabled", "Activer LDAP/AD (true/false)", False), ("ldap_server", "Serveur (ex: ldaps://ad.sanef.com:636)", False), ("ldap_base_dn", "Base DN (ex: DC=sanef,DC=com)", False), ("ldap_bind_dn", "Compte de bind (DN complet)", False), ("ldap_bind_pwd", "Mot de passe compte de bind", True), ("ldap_user_filter", "Filtre user (ex: (sAMAccountName={username}))", False), ("ldap_email_attr", "Attribut email (ex: mail)", False), ("ldap_name_attr", "Attribut nom affiché (ex: displayName)", False), ("ldap_tls", "TLS (true/false)", False), ("ldap_required_group", "Groupe AD autorise (DN complet, vide = tous)", False), ("ldap_default_role", "Role par defaut auto-provision (admin/operator/viewer)", False), ], "itop_contacts": [ ("itop_contact_teams", "Teams iTop à synchroniser (séparées par ,)", False), ], } def _load_section_values(db): vals = {} for section, fields in SECTIONS.items(): for key, label, is_secret in fields: v = get_secret(db, key) if is_secret and v: vals[key] = "********" else: vals[key] = v or "" return vals # Regles d'acces par section: visible = qui peut voir, editable = qui peut modifier SECTION_ACCESS = { "qualys": {"visible": ["admin"], "editable": ["admin"]}, "ssh_key": {"visible": ["admin"], "editable": ["admin"]}, "ssh_pwd": {"visible": ["admin", "operator"], "editable": ["admin", "operator"]}, "ssh_psmp": {"visible": ["admin", "operator"], "editable": ["admin", "operator"]}, "rdp_psm": {"visible": ["admin"], "editable": ["admin"]}, "rdp_pwd": {"visible": [], "editable": []}, "vsphere": {"visible": ["admin", "operator", "coordinator"], "editable": ["admin", "operator"]}, "splunk": {"visible": ["admin", "coordinator"], "editable": ["admin", "coordinator"]}, "teams": {"visible": ["admin", "coordinator"], "editable": ["admin", "coordinator"]}, "itop": {"visible": ["admin"], "editable": ["admin"]}, "itop_contacts": {"visible": ["admin"], "editable": ["admin"]}, "ldap": {"visible": ["admin"], "editable": ["admin"]}, "security": {"visible": ["admin"], "editable": ["admin"]}, } def _build_context(db, user, saved=None): init_secrets_from_config(db) role = user.get("role", "viewer") q_tags = db.execute(text("SELECT COUNT(*) FROM qualys_tags")).scalar() q_assets = db.execute(text("SELECT COUNT(*) FROM qualys_assets")).scalar() q_linked = db.execute(text("SELECT COUNT(*) FROM servers WHERE qualys_asset_id IS NOT NULL")).scalar() vcenters = db.execute(text("SELECT * FROM vcenters ORDER BY name")).fetchall() allowed_nets = db.execute(text("SELECT * FROM allowed_networks ORDER BY cidr")).fetchall() teams_channels = db.execute(text(""" SELECT id, name, webhook_url, description, is_active, is_default, sp_route, mode, is_reboot_channel, is_dynamic_dm, created_at FROM teams_channels ORDER BY is_default DESC, is_reboot_channel DESC, name """)).fetchall() teams_rules = db.execute(text(""" SELECT r.id, r.priority, r.name, r.match_responsable_contact_id, r.match_application_domain, r.match_env_in, r.match_msg_type_in, r.match_hostname_pattern, r.channel_id, r.is_active, r.created_at, tc.name AS channel_name, c.name AS responsable_name FROM teams_channel_rules r LEFT JOIN teams_channels tc ON tc.id = r.channel_id LEFT JOIN contacts c ON c.id = r.match_responsable_contact_id ORDER BY r.priority ASC, r.id ASC """)).fetchall() contacts_list = db.execute(text(""" SELECT id, name, teams_upn FROM contacts WHERE is_active = true ORDER BY name """)).fetchall() server_clusters = db.execute(text(""" SELECT c.id, c.name, c.description, c.reboot_strategy, c.is_active, c.created_at, COUNT(s.id) AS server_count FROM server_clusters c LEFT JOIN servers s ON s.cluster_id = c.id GROUP BY c.id, c.name, c.description, c.reboot_strategy, c.is_active, c.created_at ORDER BY c.name """)).fetchall() # Filtrer les sections visibles selon le role visible = {s: s in SECTION_ACCESS and role in SECTION_ACCESS[s]["visible"] for s in SECTIONS} editable = {s: s in SECTION_ACCESS and role in SECTION_ACCESS[s]["editable"] for s in SECTIONS} return { "user": user, "app_name": APP_NAME, "role": role, "sections": SECTIONS, "vals": _load_section_values(db), "allowed_nets": allowed_nets, "q_tags": q_tags, "q_assets": q_assets, "q_linked": q_linked, "vcenters": vcenters, "saved": saved, "teams_channels": teams_channels, "teams_rules": teams_rules, "contacts_list": contacts_list, "server_clusters": server_clusters, "visible": visible, "editable": editable, } @router.get("/settings", response_class=HTMLResponse) async def settings_page(request: Request, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_view(perms, "settings"): return RedirectResponse(url="/dashboard") ctx = _build_context(db, user) ctx["request"] = request return templates.TemplateResponse("settings.html", ctx) @router.post("/settings/{section}", response_class=HTMLResponse) async def settings_save(request: Request, section: str, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") if section not in SECTIONS: return HTMLResponse("
Section inconnue
", status_code=400) perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/settings") role = user.get("role", "viewer") if section in SECTION_ACCESS and role not in SECTION_ACCESS[section]["editable"]: return RedirectResponse(url="/settings") form = await request.form() for key, label, is_secret in SECTIONS[section]: val = form.get(key, "") if is_secret and val == "********": continue # Checkbox: si absent du form = "false" if key.endswith("_bypass_proxy") or key.endswith("_verify_ssl"): val = "true" if val else "false" set_secret(db, key, val, label) elif val: set_secret(db, key, val, label) ctx = _build_context(db, user, saved=section) ctx["request"] = request return templates.TemplateResponse("settings.html", ctx) @router.post("/settings/ldap/test") async def settings_ldap_test(request: Request, db=Depends(get_db)): """Teste la connexion LDAP avec le compte de bind.""" from fastapi.responses import JSONResponse user = get_current_user(request) if not user: return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401) perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403) from ..services.ldap_service import test_connection return JSONResponse(test_connection(db)) # --- vCenter CRUD --- @router.post("/settings/vcenter/add", response_class=HTMLResponse) async def vcenter_add(request: Request, db=Depends(get_db), vc_name: str = Form(...), vc_endpoint: str = Form(...), vc_datacenter: str = Form(""), vc_description: str = Form(""), vc_responsable: str = Form("")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/settings") db.execute(text( "INSERT INTO vcenters (name, endpoint, datacenter, description, responsable) VALUES (:n, :e, :dc, :desc, :resp)" ), {"n": vc_name, "e": vc_endpoint, "dc": vc_datacenter or None, "desc": vc_description or None, "resp": vc_responsable or None}) db.commit() ctx = _build_context(db, user, saved="vsphere") ctx["request"] = request return templates.TemplateResponse("settings.html", ctx) @router.post("/settings/vcenter/{vc_id}/delete", response_class=HTMLResponse) async def vcenter_delete(request: Request, vc_id: int, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/settings") db.execute(text("UPDATE vcenters SET is_active = false WHERE id = :id"), {"id": vc_id}) db.commit() ctx = _build_context(db, user, saved="vsphere") ctx["request"] = request return templates.TemplateResponse("settings.html", ctx) # --- Teams channels CRUD --- @router.post("/settings/teams-channel/add", response_class=HTMLResponse) async def teams_channel_add(request: Request, db=Depends(get_db), tc_name: str = Form(...), tc_mode: str = Form("sharepoint"), tc_sp_route: str = Form(""), tc_webhook_url: str = Form(""), tc_description: str = Form(""), tc_is_default: str = Form(""), tc_is_reboot_channel: str = Form(""), tc_is_dynamic_dm: str = Form("")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/settings") mode = tc_mode if tc_mode in ("sharepoint", "webhook") else "sharepoint" is_def = bool(tc_is_default) is_reboot = bool(tc_is_reboot_channel) is_dyn_dm = bool(tc_is_dynamic_dm) if is_def: db.execute(text("UPDATE teams_channels SET is_default=false WHERE is_default=true")) if is_reboot: db.execute(text("UPDATE teams_channels SET is_reboot_channel=false WHERE is_reboot_channel=true")) db.execute(text(""" INSERT INTO teams_channels (name, mode, sp_route, webhook_url, description, is_default, is_reboot_channel, is_dynamic_dm) VALUES (:n, :m, :sp, :u, :d, :df, :rb, :dyn) """), {"n": tc_name.strip(), "m": mode, "sp": (tc_sp_route.strip() or None), "u": (tc_webhook_url.strip() or None), "d": (tc_description or None), "df": is_def, "rb": is_reboot, "dyn": is_dyn_dm}) db.commit() ctx = _build_context(db, user, saved="teams") ctx["request"] = request return templates.TemplateResponse("settings.html", ctx) @router.post("/settings/teams-channel/{tc_id}/edit", response_class=HTMLResponse) async def teams_channel_edit(request: Request, tc_id: int, db=Depends(get_db), tc_name: str = Form(...), tc_mode: str = Form("sharepoint"), tc_sp_route: str = Form(""), tc_webhook_url: str = Form(""), tc_description: str = Form(""), tc_is_active: str = Form(""), tc_is_default: str = Form(""), tc_is_reboot_channel: str = Form(""), tc_is_dynamic_dm: str = Form("")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/settings") mode = tc_mode if tc_mode in ("sharepoint", "webhook") else "sharepoint" is_def = bool(tc_is_default) is_act = bool(tc_is_active) is_reboot = bool(tc_is_reboot_channel) is_dyn_dm = bool(tc_is_dynamic_dm) if is_def: db.execute(text("UPDATE teams_channels SET is_default=false WHERE is_default=true AND id<>:id"), {"id": tc_id}) if is_reboot: db.execute(text("UPDATE teams_channels SET is_reboot_channel=false WHERE is_reboot_channel=true AND id<>:id"), {"id": tc_id}) db.execute(text(""" UPDATE teams_channels SET name=:n, mode=:m, sp_route=:sp, webhook_url=:u, description=:d, is_default=:df, is_active=:ia, is_reboot_channel=:rb, is_dynamic_dm=:dyn WHERE id=:id """), {"n": tc_name.strip(), "m": mode, "sp": (tc_sp_route.strip() or None), "u": (tc_webhook_url.strip() or None), "d": (tc_description or None), "df": is_def, "ia": is_act, "rb": is_reboot, "dyn": is_dyn_dm, "id": tc_id}) db.commit() ctx = _build_context(db, user, saved="teams") ctx["request"] = request return templates.TemplateResponse("settings.html", ctx) @router.post("/settings/teams-channel/{tc_id}/delete", response_class=HTMLResponse) async def teams_channel_delete(request: Request, tc_id: int, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/settings") db.execute(text("DELETE FROM teams_channels WHERE id=:id"), {"id": tc_id}) db.commit() ctx = _build_context(db, user, saved="teams") ctx["request"] = request return templates.TemplateResponse("settings.html", ctx) @router.post("/settings/teams-channel/{tc_id}/test") async def teams_channel_test(request: Request, tc_id: int, db=Depends(get_db)): """Test du canal selon son mode : - mode='sharepoint' : écrit un fichier de test dans