patchcenter/app/routers/settings.py
Admin MPCZ 00998e9320 feat(pct): bouton Prevenance PCT + preview avant envoi + CC responsables/referents
- Service mail_service.py: send_html_mail via SMTP standard (host/port/user/pass/from/use_tls
  depuis Settings > SMTP). Gere SSL_465 et STARTTLS_587. Mode dry_run pour preview.
- Settings: nouvelle section 'smtp' avec smtp_host/port/user/pass/from/use_tls/pct_recipient
  (a configurer pour O365 SMTP submission)
- Router planning_import.py:
  * _build_pct_email(): construit subject + HTML pro/colore (header bleu degrade SANEF,
    cards avec border-left bleu/orange, tableau serveurs, footer)
  * Subject: 'Intervention sur <app>' si app uniforme, sinon liste des serveurs
  * Plage horaire = 20 min × N serveurs (formattee Hh MM)
  * 'Moyen d'exploitation prevu : Rollback en cas de probleme' ajoute en bas
  * _fetch_pct_cc_emails(): query distinct contacts depuis responsable_domaine_contact_id
    + referent_technique_contact_id + server_additional_referents
  * Endpoint POST /patching/import/pct-prevenance/preview retourne {subject, html, to, cc,
    smtp_configured, row_count} sans envoyer
  * Endpoint POST /patching/import/pct-prevenance/send envoie reellement, audit log,
    update pct_mail_sent_at sur les rows
- Template patching_import.html:
  * Bouton 'Prevenance PCT' (violet) a cote des autres actions
  * Modal preview avec iframe sandboxe pour le rendu HTML mail
  * Affiche destinataires, CC, objet, count serveurs
  * Warning rouge si SMTP non configure (envoi desactive, preview seulement)
  * 2 boutons: Annuler / Envoyer (avec confirmation)
2026-05-07 21:44:02 +02:00

777 lines
35 KiB
Python

import logging
logger = logging.getLogger(__name__)
"""Router settings — configuration modules externes + connexions"""
from fastapi import APIRouter, Request, Depends, Form
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit
from ..services.secrets_service import get_secret, set_secret, list_secrets, init_secrets_from_config
from ..config import APP_NAME
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
SECTIONS = {
"qualys": [
("qualys_url", "URL API", False),
("qualys_user", "Utilisateur", False),
("qualys_pass", "Mot de passe", True),
("qualys_proxy", "Proxy", False),
("qualys_bypass_proxy", "Bypass proxy (true/false)", False),
],
"itop": [
("itop_url", "URL API", False),
("itop_user", "Utilisateur", False),
("itop_pass", "Mot de passe", True),
],
"ssh_key": [
("ssh_key_default_user", "User SSH par defaut", False),
("ssh_key_default_port", "Port par defaut", False),
("ssh_key_private_key", "Cle privee (PEM)", True),
],
"ssh_pwd": [
("ssh_pwd_default_user", "User par defaut", False),
("ssh_pwd_default_pass", "Password par defaut", True),
],
"ssh_psmp": [
("psmp_host", "Adresse PSMP", False),
("psmp_port", "Port PSMP", False),
("psmp_user_format", "Format user", False),
("psmp_cyberark_user", "Compte CyberArk", False),
("psmp_target_user", "Utilisateur cible", False),
("psmp_default_safe", "Safe par defaut", False),
],
"rdp_psm": [
("rdp_psm_pvwa_url", "URL PVWA", False),
("rdp_psm_pvwa_user", "User PVWA", False),
("rdp_psm_pvwa_pass", "Password PVWA", True),
("rdp_psm_component", "Connection Component", False),
],
"rdp_pwd": [
("rdp_pwd_default_user", "User par defaut", False),
("rdp_pwd_default_pass", "Password par defaut", True),
("rdp_pwd_default_port", "Port RDP", False),
],
"vsphere": [
("vsphere_user", "Utilisateur vCenter", False),
("vsphere_pass", "Mot de passe vCenter", True),
],
"security": [
("security_session_timeout", "Timeout session (minutes)", False),
("security_max_login_attempts", "Max tentatives login", False),
],
"splunk": [
("splunk_hec_url", "URL HEC", False),
("splunk_hec_token", "Token HEC", True),
("splunk_index", "Index", False),
("splunk_sourcetype", "Sourcetype", False),
("splunk_verify_ssl", "Verifier SSL (true/false)", False),
],
"teams": [
("sharepoint_notif_path", "Chemin local OneDrive sync (ex: C:\\Users\\xxx\\sanefgroupe...\\notifications)", False),
("teams_webhook_url", "Webhook URL globale (legacy, futur OAuth)", False),
("teams_sp_site_url", "SharePoint Site URL (futur webhook)", False),
("teams_sp_library", "SharePoint Library (futur webhook)", False),
("teams_sp_folder", "SharePoint Folder (futur webhook)", False),
("teams_sp_client_id", "App Client ID (futur webhook OAuth)", False),
("teams_sp_client_secret", "App Client Secret (futur webhook OAuth)", True),
("teams_sp_tenant_id", "Tenant ID (futur webhook OAuth)", False),
],
"smtp": [
("smtp_host", "Serveur SMTP (ex: vpdsismtp1.sanef.groupe)", False),
("smtp_port", "Port SMTP (25 / 465 / 587)", False),
("smtp_user", "User SMTP (vide si relay anonyme)", False),
("smtp_pass", "Password SMTP (vide si relay anonyme)", True),
("smtp_from", "From (ex: patchcenter@sanef.com)", False),
("smtp_use_tls", "Utiliser TLS/STARTTLS (true/false)", False),
("smtp_pct_recipient", "Destinataire prévenance PCT (ex: PCT.reims@sanef.com)", False),
],
"ldap": [
("ldap_enabled", "Activer LDAP/AD (true/false)", False),
("ldap_server", "Serveur (ex: ldaps://ad.sanef.com:636)", False),
("ldap_base_dn", "Base DN (ex: DC=sanef,DC=com)", False),
("ldap_bind_dn", "Compte de bind (DN complet)", False),
("ldap_bind_pwd", "Mot de passe compte de bind", True),
("ldap_user_filter", "Filtre user (ex: (sAMAccountName={username}))", False),
("ldap_email_attr", "Attribut email (ex: mail)", False),
("ldap_name_attr", "Attribut nom affiché (ex: displayName)", False),
("ldap_tls", "TLS (true/false)", False),
("ldap_required_group", "Groupe AD autorise (DN complet, vide = tous)", False),
("ldap_default_role", "Role par defaut auto-provision (admin/operator/viewer)", False),
],
"itop_contacts": [
("itop_contact_teams", "Teams iTop à synchroniser (séparées par ,)", False),
],
}
def _load_section_values(db):
vals = {}
for section, fields in SECTIONS.items():
for key, label, is_secret in fields:
v = get_secret(db, key)
if is_secret and v:
vals[key] = "********"
else:
vals[key] = v or ""
return vals
# Regles d'acces par section: visible = qui peut voir, editable = qui peut modifier
SECTION_ACCESS = {
"qualys": {"visible": ["admin"], "editable": ["admin"]},
"ssh_key": {"visible": ["admin"], "editable": ["admin"]},
"ssh_pwd": {"visible": ["admin", "operator"], "editable": ["admin", "operator"]},
"ssh_psmp": {"visible": ["admin", "operator"], "editable": ["admin", "operator"]},
"rdp_psm": {"visible": ["admin"], "editable": ["admin"]},
"rdp_pwd": {"visible": [], "editable": []},
"vsphere": {"visible": ["admin", "operator", "coordinator"], "editable": ["admin", "operator"]},
"splunk": {"visible": ["admin", "coordinator"], "editable": ["admin", "coordinator"]},
"teams": {"visible": ["admin", "coordinator"], "editable": ["admin", "coordinator"]},
"itop": {"visible": ["admin"], "editable": ["admin"]},
"itop_contacts": {"visible": ["admin"], "editable": ["admin"]},
"ldap": {"visible": ["admin"], "editable": ["admin"]},
"smtp": {"visible": ["admin", "coordinator"], "editable": ["admin", "coordinator"]},
"security": {"visible": ["admin"], "editable": ["admin"]},
}
def _build_context(db, user, saved=None):
init_secrets_from_config(db)
role = user.get("role", "viewer")
q_tags = db.execute(text("SELECT COUNT(*) FROM qualys_tags")).scalar()
q_assets = db.execute(text("SELECT COUNT(*) FROM qualys_assets")).scalar()
q_linked = db.execute(text("SELECT COUNT(*) FROM servers WHERE qualys_asset_id IS NOT NULL")).scalar()
vcenters = db.execute(text("SELECT * FROM vcenters ORDER BY name")).fetchall()
allowed_nets = db.execute(text("SELECT * FROM allowed_networks ORDER BY cidr")).fetchall()
teams_channels = db.execute(text("""
SELECT id, name, webhook_url, description, is_active, is_default,
sp_route, mode, is_reboot_channel, is_dynamic_dm, created_at
FROM teams_channels
ORDER BY is_default DESC, is_reboot_channel DESC, name
""")).fetchall()
teams_rules = db.execute(text("""
SELECT r.id, r.priority, r.name,
r.match_responsable_contact_id, r.match_application_domain,
r.match_env_in, r.match_msg_type_in, r.match_hostname_pattern,
r.match_is_database_server,
r.channel_id, r.is_active, r.created_at,
tc.name AS channel_name,
c.name AS responsable_name
FROM teams_channel_rules r
LEFT JOIN teams_channels tc ON tc.id = r.channel_id
LEFT JOIN contacts c ON c.id = r.match_responsable_contact_id
ORDER BY r.priority ASC, r.id ASC
""")).fetchall()
contacts_list = db.execute(text("""
SELECT id, name, teams_upn FROM contacts WHERE is_active = true ORDER BY name
""")).fetchall()
server_clusters = db.execute(text("""
SELECT c.id, c.name, c.description, c.reboot_strategy, c.is_active, c.created_at,
COUNT(s.id) AS server_count
FROM server_clusters c
LEFT JOIN servers s ON s.cluster_id = c.id
GROUP BY c.id, c.name, c.description, c.reboot_strategy, c.is_active, c.created_at
ORDER BY c.name
""")).fetchall()
# Filtrer les sections visibles selon le role
visible = {s: s in SECTION_ACCESS and role in SECTION_ACCESS[s]["visible"] for s in SECTIONS}
editable = {s: s in SECTION_ACCESS and role in SECTION_ACCESS[s]["editable"] for s in SECTIONS}
return {
"user": user, "app_name": APP_NAME, "role": role,
"sections": SECTIONS, "vals": _load_section_values(db),
"allowed_nets": allowed_nets,
"q_tags": q_tags, "q_assets": q_assets, "q_linked": q_linked,
"vcenters": vcenters, "saved": saved,
"teams_channels": teams_channels,
"teams_rules": teams_rules,
"contacts_list": contacts_list,
"server_clusters": server_clusters,
"visible": visible, "editable": editable,
}
@router.get("/settings", response_class=HTMLResponse)
async def settings_page(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "settings"):
return RedirectResponse(url="/dashboard")
ctx = _build_context(db, user)
ctx["request"] = request
return templates.TemplateResponse("settings.html", ctx)
@router.post("/settings/{section}", response_class=HTMLResponse)
async def settings_save(request: Request, section: str, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
if section not in SECTIONS:
return HTMLResponse("<p>Section inconnue</p>", status_code=400)
perms = get_user_perms(db, user)
if not can_edit(perms, "settings"):
return RedirectResponse(url="/settings")
role = user.get("role", "viewer")
if section in SECTION_ACCESS and role not in SECTION_ACCESS[section]["editable"]:
return RedirectResponse(url="/settings")
form = await request.form()
for key, label, is_secret in SECTIONS[section]:
val = form.get(key, "")
if is_secret and val == "********":
continue
# Checkbox: si absent du form = "false"
if key.endswith("_bypass_proxy") or key.endswith("_verify_ssl"):
val = "true" if val else "false"
set_secret(db, key, val, label)
elif val:
set_secret(db, key, val, label)
ctx = _build_context(db, user, saved=section)
ctx["request"] = request
return templates.TemplateResponse("settings.html", ctx)
@router.post("/settings/ldap/test")
async def settings_ldap_test(request: Request, db=Depends(get_db)):
"""Teste la connexion LDAP avec le compte de bind."""
from fastapi.responses import JSONResponse
user = get_current_user(request)
if not user:
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
perms = get_user_perms(db, user)
if not can_edit(perms, "settings"):
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
from ..services.ldap_service import test_connection
return JSONResponse(test_connection(db))
# --- vCenter CRUD ---
@router.post("/settings/vcenter/add", response_class=HTMLResponse)
async def vcenter_add(request: Request, db=Depends(get_db),
vc_name: str = Form(...), vc_endpoint: str = Form(...),
vc_datacenter: str = Form(""), vc_description: str = Form(""),
vc_responsable: str = Form("")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "settings"):
return RedirectResponse(url="/settings")
db.execute(text(
"INSERT INTO vcenters (name, endpoint, datacenter, description, responsable) VALUES (:n, :e, :dc, :desc, :resp)"
), {"n": vc_name, "e": vc_endpoint, "dc": vc_datacenter or None, "desc": vc_description or None, "resp": vc_responsable or None})
db.commit()
ctx = _build_context(db, user, saved="vsphere")
ctx["request"] = request
return templates.TemplateResponse("settings.html", ctx)
@router.post("/settings/vcenter/{vc_id}/delete", response_class=HTMLResponse)
async def vcenter_delete(request: Request, vc_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "settings"):
return RedirectResponse(url="/settings")
db.execute(text("UPDATE vcenters SET is_active = false WHERE id = :id"), {"id": vc_id})
db.commit()
ctx = _build_context(db, user, saved="vsphere")
ctx["request"] = request
return templates.TemplateResponse("settings.html", ctx)
# --- Teams channels CRUD ---
@router.post("/settings/teams-channel/add", response_class=HTMLResponse)
async def teams_channel_add(request: Request, db=Depends(get_db),
tc_name: str = Form(...),
tc_mode: str = Form("sharepoint"),
tc_sp_route: str = Form(""),
tc_webhook_url: str = Form(""),
tc_description: str = Form(""),
tc_is_default: str = Form(""),
tc_is_reboot_channel: str = Form(""),
tc_is_dynamic_dm: str = Form("")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "settings"):
return RedirectResponse(url="/settings")
mode = tc_mode if tc_mode in ("sharepoint", "webhook") else "sharepoint"
is_def = bool(tc_is_default)
is_reboot = bool(tc_is_reboot_channel)
is_dyn_dm = bool(tc_is_dynamic_dm)
if is_def:
db.execute(text("UPDATE teams_channels SET is_default=false WHERE is_default=true"))
if is_reboot:
db.execute(text("UPDATE teams_channels SET is_reboot_channel=false WHERE is_reboot_channel=true"))
db.execute(text("""
INSERT INTO teams_channels (name, mode, sp_route, webhook_url, description,
is_default, is_reboot_channel, is_dynamic_dm)
VALUES (:n, :m, :sp, :u, :d, :df, :rb, :dyn)
"""), {"n": tc_name.strip(), "m": mode,
"sp": (tc_sp_route.strip() or None),
"u": (tc_webhook_url.strip() or None),
"d": (tc_description or None),
"df": is_def, "rb": is_reboot, "dyn": is_dyn_dm})
db.commit()
ctx = _build_context(db, user, saved="teams")
ctx["request"] = request
return templates.TemplateResponse("settings.html", ctx)
@router.post("/settings/teams-channel/{tc_id}/edit", response_class=HTMLResponse)
async def teams_channel_edit(request: Request, tc_id: int, db=Depends(get_db),
tc_name: str = Form(...),
tc_mode: str = Form("sharepoint"),
tc_sp_route: str = Form(""),
tc_webhook_url: str = Form(""),
tc_description: str = Form(""),
tc_is_active: str = Form(""),
tc_is_default: str = Form(""),
tc_is_reboot_channel: str = Form(""),
tc_is_dynamic_dm: str = Form("")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "settings"):
return RedirectResponse(url="/settings")
mode = tc_mode if tc_mode in ("sharepoint", "webhook") else "sharepoint"
is_def = bool(tc_is_default)
is_act = bool(tc_is_active)
is_reboot = bool(tc_is_reboot_channel)
is_dyn_dm = bool(tc_is_dynamic_dm)
if is_def:
db.execute(text("UPDATE teams_channels SET is_default=false WHERE is_default=true AND id<>:id"),
{"id": tc_id})
if is_reboot:
db.execute(text("UPDATE teams_channels SET is_reboot_channel=false WHERE is_reboot_channel=true AND id<>:id"),
{"id": tc_id})
db.execute(text("""
UPDATE teams_channels
SET name=:n, mode=:m, sp_route=:sp, webhook_url=:u, description=:d,
is_default=:df, is_active=:ia,
is_reboot_channel=:rb, is_dynamic_dm=:dyn
WHERE id=:id
"""), {"n": tc_name.strip(), "m": mode,
"sp": (tc_sp_route.strip() or None),
"u": (tc_webhook_url.strip() or None),
"d": (tc_description or None),
"df": is_def, "ia": is_act,
"rb": is_reboot, "dyn": is_dyn_dm, "id": tc_id})
db.commit()
ctx = _build_context(db, user, saved="teams")
ctx["request"] = request
return templates.TemplateResponse("settings.html", ctx)
@router.post("/settings/teams-channel/{tc_id}/delete", response_class=HTMLResponse)
async def teams_channel_delete(request: Request, tc_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "settings"):
return RedirectResponse(url="/settings")
db.execute(text("DELETE FROM teams_channels WHERE id=:id"), {"id": tc_id})
db.commit()
ctx = _build_context(db, user, saved="teams")
ctx["request"] = request
return templates.TemplateResponse("settings.html", ctx)
@router.post("/settings/teams-channel/{tc_id}/test")
async def teams_channel_test(request: Request, tc_id: int, db=Depends(get_db)):
"""Test du canal selon son mode :
- mode='sharepoint' : écrit un fichier de test dans <sharepoint_notif_path>/<sp_route>/
- mode='webhook' : POST direct sur webhook_url (Adaptive Card)"""
from fastapi.responses import JSONResponse
user = get_current_user(request)
if not user:
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
perms = get_user_perms(db, user)
if not can_edit(perms, "settings"):
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
row = db.execute(text("""
SELECT id, name, mode, sp_route, webhook_url, is_dynamic_dm
FROM teams_channels WHERE id=:id
"""), {"id": tc_id}).fetchone()
if not row:
return JSONResponse({"ok": False, "msg": "Canal introuvable"}, status_code=404)
sender = user.get("username") or "PatchCenter"
try:
if row.mode == "sharepoint":
from ..services.teams_service import write_sharepoint_notification
sp_base = (get_secret(db, "sharepoint_notif_path") or "").strip()
if not sp_base:
return JSONResponse({"ok": False,
"msg": "sharepoint_notif_path non configuré (Settings > Teams > Notifications)"},
status_code=400)
if not row.sp_route:
return JSONResponse({"ok": False, "msg": "sp_route non défini sur ce canal"},
status_code=400)
dyn_email = None
if row.is_dynamic_dm:
# Test mode : on met l'email du user lui-même (s'il est connu)
u_row = db.execute(text("""
SELECT email FROM users WHERE username=:u OR email=:u LIMIT 1
"""), {"u": sender}).fetchone()
dyn_email = (u_row.email if u_row else "test.user@example.com")
result = write_sharepoint_notification(
sp_base=sp_base, sp_route=row.sp_route, msg_type="debut",
server_name=f"TEST-PATCHCENTER-{row.name}", intervenant=sender,
dynamic_to_email=dyn_email,
)
return JSONResponse(result)
elif row.mode == "webhook":
if not row.webhook_url:
return JSONResponse({"ok": False, "msg": "webhook_url non défini"}, status_code=400)
from ..services.teams_service import send_test_message
result = send_test_message(row.webhook_url, row.name, sender)
return JSONResponse(result)
else:
return JSONResponse({"ok": False, "msg": f"Mode inconnu: {row.mode}"}, status_code=400)
except Exception as e:
return JSONResponse({"ok": False, "msg": f"Erreur: {e}"}, status_code=500)
# --- Teams channel rules CRUD ---
def _parse_csv_text_array(s: str):
"""'a, b ,c' -> ['a','b','c'] ; vide -> None."""
if not s:
return None
items = [p.strip() for p in s.split(",")]
items = [p for p in items if p]
return items or None
def _parse_db_filter(s: str):
"""'true'/'1'/'yes' -> True ; 'false'/'0'/'no' -> False ; '' -> None (pas filtré)."""
if s is None: return None
s = s.strip().lower()
if s in ("", "any", "null", "none", "-"): return None
if s in ("true", "1", "yes", "y", "oui", "db"): return True
if s in ("false", "0", "no", "n", "non", "nondb", "non-db"): return False
return None
@router.post("/settings/teams-rule/add", response_class=HTMLResponse)
async def teams_rule_add(request: Request, db=Depends(get_db),
tr_priority: int = Form(100),
tr_name: str = Form(...),
tr_channel_id: int = Form(...),
tr_match_responsable_contact_id: str = Form(""),
tr_match_application_domain: str = Form(""),
tr_match_env_in: str = Form(""),
tr_match_msg_type_in: str = Form(""),
tr_match_hostname_pattern: str = Form(""),
tr_match_is_database_server: str = Form("")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "settings"):
return RedirectResponse(url="/settings")
resp_id = int(tr_match_responsable_contact_id) if tr_match_responsable_contact_id.strip() else None
db.execute(text("""
INSERT INTO teams_channel_rules
(priority, name, channel_id,
match_responsable_contact_id, match_application_domain,
match_env_in, match_msg_type_in, match_hostname_pattern,
match_is_database_server)
VALUES (:p, :n, :ch, :rc, :dom, :env, :mt, :host, :db)
"""), {
"p": tr_priority, "n": tr_name.strip(), "ch": tr_channel_id,
"rc": resp_id,
"dom": (tr_match_application_domain.strip() or None),
"env": _parse_csv_text_array(tr_match_env_in),
"mt": _parse_csv_text_array(tr_match_msg_type_in),
"host": (tr_match_hostname_pattern.strip() or None),
"db": _parse_db_filter(tr_match_is_database_server),
})
db.commit()
ctx = _build_context(db, user, saved="teams")
ctx["request"] = request
return templates.TemplateResponse("settings.html", ctx)
@router.post("/settings/teams-rule/{tr_id}/edit", response_class=HTMLResponse)
async def teams_rule_edit(request: Request, tr_id: int, db=Depends(get_db),
tr_priority: int = Form(100),
tr_name: str = Form(...),
tr_channel_id: int = Form(...),
tr_match_responsable_contact_id: str = Form(""),
tr_match_application_domain: str = Form(""),
tr_match_env_in: str = Form(""),
tr_match_msg_type_in: str = Form(""),
tr_match_hostname_pattern: str = Form(""),
tr_match_is_database_server: str = Form(""),
tr_is_active: str = Form("")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "settings"):
return RedirectResponse(url="/settings")
resp_id = int(tr_match_responsable_contact_id) if tr_match_responsable_contact_id.strip() else None
db.execute(text("""
UPDATE teams_channel_rules
SET priority=:p, name=:n, channel_id=:ch,
match_responsable_contact_id=:rc,
match_application_domain=:dom,
match_env_in=:env, match_msg_type_in=:mt,
match_hostname_pattern=:host,
match_is_database_server=:db,
is_active=:ia
WHERE id=:id
"""), {
"p": tr_priority, "n": tr_name.strip(), "ch": tr_channel_id,
"rc": resp_id,
"dom": (tr_match_application_domain.strip() or None),
"env": _parse_csv_text_array(tr_match_env_in),
"mt": _parse_csv_text_array(tr_match_msg_type_in),
"host": (tr_match_hostname_pattern.strip() or None),
"db": _parse_db_filter(tr_match_is_database_server),
"ia": bool(tr_is_active), "id": tr_id,
})
db.commit()
ctx = _build_context(db, user, saved="teams")
ctx["request"] = request
return templates.TemplateResponse("settings.html", ctx)
@router.post("/settings/teams-rule/{tr_id}/delete", response_class=HTMLResponse)
async def teams_rule_delete(request: Request, tr_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "settings"):
return RedirectResponse(url="/settings")
db.execute(text("DELETE FROM teams_channel_rules WHERE id=:id"), {"id": tr_id})
db.commit()
ctx = _build_context(db, user, saved="teams")
ctx["request"] = request
return templates.TemplateResponse("settings.html", ctx)
@router.post("/settings/teams-rule/test")
async def teams_rule_test(request: Request, db=Depends(get_db),
server_id: int = Form(...),
msg_type: str = Form("debut")):
"""Teste la résolution sans envoyer : retourne la liste des canaux qui seraient
notifiés pour (server_id, msg_type), avec la source (rule:<name> / reboot / default)."""
from fastapi.responses import JSONResponse
user = get_current_user(request)
if not user:
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
perms = get_user_perms(db, user)
if not can_view(perms, "settings"):
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
from ..services.teams_service import resolve_channels_for_server
result = resolve_channels_for_server(db, server_id, msg_type)
return JSONResponse(result)
# --- Server clusters CRUD ---
@router.post("/settings/server-cluster/add", response_class=HTMLResponse)
async def server_cluster_add(request: Request, db=Depends(get_db),
sc_name: str = Form(...),
sc_description: str = Form(""),
sc_reboot_strategy: str = Form("sequential")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "settings"):
return RedirectResponse(url="/settings")
strat = sc_reboot_strategy if sc_reboot_strategy in ("sequential", "parallel") else "sequential"
try:
db.execute(text("""
INSERT INTO server_clusters (name, description, reboot_strategy)
VALUES (:n, :d, :s)
"""), {"n": sc_name.strip(), "d": (sc_description or None), "s": strat})
db.commit()
except Exception as e:
db.rollback()
# Probablement violation unique sur name
logger.warning(f"server_cluster_add failed: {e}")
ctx = _build_context(db, user, saved="cluster")
ctx["request"] = request
return templates.TemplateResponse("settings.html", ctx)
@router.post("/settings/server-cluster/{sc_id}/edit", response_class=HTMLResponse)
async def server_cluster_edit(request: Request, sc_id: int, db=Depends(get_db),
sc_name: str = Form(...),
sc_description: str = Form(""),
sc_reboot_strategy: str = Form("sequential"),
sc_is_active: str = Form("")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "settings"):
return RedirectResponse(url="/settings")
strat = sc_reboot_strategy if sc_reboot_strategy in ("sequential", "parallel") else "sequential"
db.execute(text("""
UPDATE server_clusters
SET name=:n, description=:d, reboot_strategy=:s, is_active=:ia
WHERE id=:id
"""), {"n": sc_name.strip(), "d": (sc_description or None), "s": strat,
"ia": bool(sc_is_active), "id": sc_id})
db.commit()
ctx = _build_context(db, user, saved="cluster")
ctx["request"] = request
return templates.TemplateResponse("settings.html", ctx)
@router.post("/settings/server-cluster/{sc_id}/delete", response_class=HTMLResponse)
async def server_cluster_delete(request: Request, sc_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "settings"):
return RedirectResponse(url="/settings")
# Check qu'aucun serveur n'y est encore rattaché
cnt = db.execute(text("SELECT COUNT(*) FROM servers WHERE cluster_id=:id"),
{"id": sc_id}).scalar() or 0
if cnt > 0:
# Désactivation seule (ne pas casser les FK et l'historique)
db.execute(text("UPDATE server_clusters SET is_active=false WHERE id=:id"), {"id": sc_id})
else:
db.execute(text("DELETE FROM server_clusters WHERE id=:id"), {"id": sc_id})
db.commit()
ctx = _build_context(db, user, saved="cluster")
ctx["request"] = request
return templates.TemplateResponse("settings.html", ctx)
@router.get("/settings/server-cluster/{sc_id}/servers")
async def server_cluster_servers(request: Request, sc_id: int, db=Depends(get_db)):
"""Liste des serveurs du cluster avec leur cluster_order — JSON pour panneau détail."""
from fastapi.responses import JSONResponse
user = get_current_user(request)
if not user:
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
perms = get_user_perms(db, user)
if not can_view(perms, "settings"):
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
rows = db.execute(text("""
SELECT id, hostname, cluster_order, machine_type, etat
FROM servers
WHERE cluster_id = :id
ORDER BY cluster_order NULLS LAST, hostname
"""), {"id": sc_id}).fetchall()
return JSONResponse({"ok": True, "count": len(rows),
"servers": [{"id": r.id, "hostname": str(r.hostname),
"order": r.cluster_order,
"machine_type": r.machine_type,
"etat": r.etat} for r in rows]})
# --- Secret individuel ---
@router.post("/settings/secret/update", response_class=HTMLResponse)
async def secret_update(request: Request, db=Depends(get_db),
secret_key: str = Form(...), secret_value: str = Form(...)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "settings"):
return RedirectResponse(url="/settings")
if secret_value and secret_value != "********":
# Recuperer la description existante
existing = db.execute(text("SELECT description FROM app_secrets WHERE key = :k"),
{"k": secret_key}).fetchone()
desc = existing.description if existing else secret_key
set_secret(db, secret_key, secret_value, desc)
ctx = _build_context(db, user, saved="secret")
ctx["request"] = request
return templates.TemplateResponse("settings.html", ctx)
# --- Réseaux autorisés ---
def _regen_nginx_acl(db):
"""Régénère le fichier ACL nginx depuis la base"""
nets = db.execute(text("SELECT cidr FROM allowed_networks WHERE is_active = true ORDER BY cidr")).fetchall()
lines = ["# Généré par PatchCenter — ne pas éditer manuellement"]
for n in nets:
lines.append(f"allow {n.cidr};")
lines.append("deny all;")
try:
with open("/etc/nginx/patchcenter_acl.conf", "w") as f:
f.write("\n".join(lines) + "\n")
import subprocess
subprocess.run(["nginx", "-t"], capture_output=True, check=True)
subprocess.run(["systemctl", "reload", "nginx"], capture_output=True)
return True
except Exception:
return False
@router.post("/settings/network/add")
async def network_add(request: Request, db=Depends(get_db),
cidr: str = Form(...), description: str = Form("")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "settings"):
return RedirectResponse(url="/settings")
db.execute(text("INSERT INTO allowed_networks (cidr, description) VALUES (:c, :d)"),
{"c": cidr.strip(), "d": description or None})
db.commit()
_regen_nginx_acl(db)
ctx = _build_context(db, user, saved="security")
ctx["request"] = request
return templates.TemplateResponse("settings.html", ctx)
@router.post("/settings/network/{net_id}/delete")
async def network_delete(request: Request, net_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "settings"):
return RedirectResponse(url="/settings")
db.execute(text("DELETE FROM allowed_networks WHERE id = :id"), {"id": net_id})
db.commit()
_regen_nginx_acl(db)
ctx = _build_context(db, user, saved="security")
ctx["request"] = request
return templates.TemplateResponse("settings.html", ctx)
@router.post("/settings/network/{net_id}/toggle")
async def network_toggle(request: Request, net_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "settings"):
return RedirectResponse(url="/settings")
db.execute(text("UPDATE allowed_networks SET is_active = NOT is_active WHERE id = :id"), {"id": net_id})
db.commit()
_regen_nginx_acl(db)
ctx = _build_context(db, user, saved="security")
ctx["request"] = request
return templates.TemplateResponse("settings.html", ctx)