- Endpoint POST /settings/smtp/test (Form 'recipient', defaut kalid.moutaouakil@gmail.com) envoie un mail HTML pro confirmant que SMTP fonctionne (header bleu degrade, metadonnees envoyeur/date/destinataire en card) - UI Settings > SMTP: champ destinataire test pre-rempli + bouton 'Envoyer test' AJAX sans reload, status vert/rouge en dessous - Reuse send_html_mail du mail_service
832 lines
38 KiB
Python
832 lines
38 KiB
Python
import logging
|
|
logger = logging.getLogger(__name__)
|
|
"""Router settings — configuration modules externes + connexions"""
|
|
from fastapi import APIRouter, Request, Depends, Form
|
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from sqlalchemy import text
|
|
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit
|
|
from ..services.secrets_service import get_secret, set_secret, list_secrets, init_secrets_from_config
|
|
from ..config import APP_NAME
|
|
|
|
router = APIRouter()
|
|
templates = Jinja2Templates(directory="app/templates")
|
|
|
|
SECTIONS = {
|
|
"qualys": [
|
|
("qualys_url", "URL API", False),
|
|
("qualys_user", "Utilisateur", False),
|
|
("qualys_pass", "Mot de passe", True),
|
|
("qualys_proxy", "Proxy", False),
|
|
("qualys_bypass_proxy", "Bypass proxy (true/false)", False),
|
|
],
|
|
"itop": [
|
|
("itop_url", "URL API", False),
|
|
("itop_user", "Utilisateur", False),
|
|
("itop_pass", "Mot de passe", True),
|
|
],
|
|
"ssh_key": [
|
|
("ssh_key_default_user", "User SSH par defaut", False),
|
|
("ssh_key_default_port", "Port par defaut", False),
|
|
("ssh_key_private_key", "Cle privee (PEM)", True),
|
|
],
|
|
"ssh_pwd": [
|
|
("ssh_pwd_default_user", "User par defaut", False),
|
|
("ssh_pwd_default_pass", "Password par defaut", True),
|
|
],
|
|
"ssh_psmp": [
|
|
("psmp_host", "Adresse PSMP", False),
|
|
("psmp_port", "Port PSMP", False),
|
|
("psmp_user_format", "Format user", False),
|
|
("psmp_cyberark_user", "Compte CyberArk", False),
|
|
("psmp_target_user", "Utilisateur cible", False),
|
|
("psmp_default_safe", "Safe par defaut", False),
|
|
],
|
|
"rdp_psm": [
|
|
("rdp_psm_pvwa_url", "URL PVWA", False),
|
|
("rdp_psm_pvwa_user", "User PVWA", False),
|
|
("rdp_psm_pvwa_pass", "Password PVWA", True),
|
|
("rdp_psm_component", "Connection Component", False),
|
|
],
|
|
"rdp_pwd": [
|
|
("rdp_pwd_default_user", "User par defaut", False),
|
|
("rdp_pwd_default_pass", "Password par defaut", True),
|
|
("rdp_pwd_default_port", "Port RDP", False),
|
|
],
|
|
"vsphere": [
|
|
("vsphere_user", "Utilisateur vCenter", False),
|
|
("vsphere_pass", "Mot de passe vCenter", True),
|
|
],
|
|
"security": [
|
|
("security_session_timeout", "Timeout session (minutes)", False),
|
|
("security_max_login_attempts", "Max tentatives login", False),
|
|
],
|
|
"splunk": [
|
|
("splunk_hec_url", "URL HEC", False),
|
|
("splunk_hec_token", "Token HEC", True),
|
|
("splunk_index", "Index", False),
|
|
("splunk_sourcetype", "Sourcetype", False),
|
|
("splunk_verify_ssl", "Verifier SSL (true/false)", False),
|
|
],
|
|
"teams": [
|
|
("sharepoint_notif_path", "Chemin local OneDrive sync (ex: C:\\Users\\xxx\\sanefgroupe...\\notifications)", False),
|
|
("teams_webhook_url", "Webhook URL globale (legacy, futur OAuth)", False),
|
|
("teams_sp_site_url", "SharePoint Site URL (futur webhook)", False),
|
|
("teams_sp_library", "SharePoint Library (futur webhook)", False),
|
|
("teams_sp_folder", "SharePoint Folder (futur webhook)", False),
|
|
("teams_sp_client_id", "App Client ID (futur webhook OAuth)", False),
|
|
("teams_sp_client_secret", "App Client Secret (futur webhook OAuth)", True),
|
|
("teams_sp_tenant_id", "Tenant ID (futur webhook OAuth)", False),
|
|
],
|
|
"smtp": [
|
|
("smtp_host", "Serveur SMTP (ex: vpdsismtp1.sanef.groupe)", False),
|
|
("smtp_port", "Port SMTP (25 / 465 / 587)", False),
|
|
("smtp_user", "User SMTP (vide si relay anonyme)", False),
|
|
("smtp_pass", "Password SMTP (vide si relay anonyme)", True),
|
|
("smtp_from", "From (ex: patchcenter@sanef.com)", False),
|
|
("smtp_use_tls", "Utiliser TLS/STARTTLS (true/false)", False),
|
|
("smtp_pct_recipient", "Destinataire prévenance PCT (ex: PCT.reims@sanef.com)", False),
|
|
],
|
|
"ldap": [
|
|
("ldap_enabled", "Activer LDAP/AD (true/false)", False),
|
|
("ldap_server", "Serveur (ex: ldaps://ad.sanef.com:636)", False),
|
|
("ldap_base_dn", "Base DN (ex: DC=sanef,DC=com)", False),
|
|
("ldap_bind_dn", "Compte de bind (DN complet)", False),
|
|
("ldap_bind_pwd", "Mot de passe compte de bind", True),
|
|
("ldap_user_filter", "Filtre user (ex: (sAMAccountName={username}))", False),
|
|
("ldap_email_attr", "Attribut email (ex: mail)", False),
|
|
("ldap_name_attr", "Attribut nom affiché (ex: displayName)", False),
|
|
("ldap_tls", "TLS (true/false)", False),
|
|
("ldap_required_group", "Groupe AD autorise (DN complet, vide = tous)", False),
|
|
("ldap_default_role", "Role par defaut auto-provision (admin/operator/viewer)", False),
|
|
],
|
|
"itop_contacts": [
|
|
("itop_contact_teams", "Teams iTop à synchroniser (séparées par ,)", False),
|
|
],
|
|
}
|
|
|
|
|
|
def _load_section_values(db):
|
|
vals = {}
|
|
for section, fields in SECTIONS.items():
|
|
for key, label, is_secret in fields:
|
|
v = get_secret(db, key)
|
|
if is_secret and v:
|
|
vals[key] = "********"
|
|
else:
|
|
vals[key] = v or ""
|
|
return vals
|
|
|
|
|
|
# Regles d'acces par section: visible = qui peut voir, editable = qui peut modifier
|
|
SECTION_ACCESS = {
|
|
"qualys": {"visible": ["admin"], "editable": ["admin"]},
|
|
"ssh_key": {"visible": ["admin"], "editable": ["admin"]},
|
|
"ssh_pwd": {"visible": ["admin", "operator"], "editable": ["admin", "operator"]},
|
|
"ssh_psmp": {"visible": ["admin", "operator"], "editable": ["admin", "operator"]},
|
|
"rdp_psm": {"visible": ["admin"], "editable": ["admin"]},
|
|
"rdp_pwd": {"visible": [], "editable": []},
|
|
"vsphere": {"visible": ["admin", "operator", "coordinator"], "editable": ["admin", "operator"]},
|
|
"splunk": {"visible": ["admin", "coordinator"], "editable": ["admin", "coordinator"]},
|
|
"teams": {"visible": ["admin", "coordinator"], "editable": ["admin", "coordinator"]},
|
|
"itop": {"visible": ["admin"], "editable": ["admin"]},
|
|
"itop_contacts": {"visible": ["admin"], "editable": ["admin"]},
|
|
"ldap": {"visible": ["admin"], "editable": ["admin"]},
|
|
"smtp": {"visible": ["admin", "coordinator"], "editable": ["admin", "coordinator"]},
|
|
"security": {"visible": ["admin"], "editable": ["admin"]},
|
|
}
|
|
|
|
|
|
def _build_context(db, user, saved=None):
|
|
init_secrets_from_config(db)
|
|
role = user.get("role", "viewer")
|
|
q_tags = db.execute(text("SELECT COUNT(*) FROM qualys_tags")).scalar()
|
|
q_assets = db.execute(text("SELECT COUNT(*) FROM qualys_assets")).scalar()
|
|
q_linked = db.execute(text("SELECT COUNT(*) FROM servers WHERE qualys_asset_id IS NOT NULL")).scalar()
|
|
vcenters = db.execute(text("SELECT * FROM vcenters ORDER BY name")).fetchall()
|
|
allowed_nets = db.execute(text("SELECT * FROM allowed_networks ORDER BY cidr")).fetchall()
|
|
teams_channels = db.execute(text("""
|
|
SELECT id, name, webhook_url, description, is_active, is_default,
|
|
sp_route, mode, is_reboot_channel, is_dynamic_dm, created_at
|
|
FROM teams_channels
|
|
ORDER BY is_default DESC, is_reboot_channel DESC, name
|
|
""")).fetchall()
|
|
teams_rules = db.execute(text("""
|
|
SELECT r.id, r.priority, r.name,
|
|
r.match_responsable_contact_id, r.match_application_domain,
|
|
r.match_env_in, r.match_msg_type_in, r.match_hostname_pattern,
|
|
r.match_is_database_server,
|
|
r.channel_id, r.is_active, r.created_at,
|
|
tc.name AS channel_name,
|
|
c.name AS responsable_name
|
|
FROM teams_channel_rules r
|
|
LEFT JOIN teams_channels tc ON tc.id = r.channel_id
|
|
LEFT JOIN contacts c ON c.id = r.match_responsable_contact_id
|
|
ORDER BY r.priority ASC, r.id ASC
|
|
""")).fetchall()
|
|
contacts_list = db.execute(text("""
|
|
SELECT id, name, teams_upn FROM contacts WHERE is_active = true ORDER BY name
|
|
""")).fetchall()
|
|
server_clusters = db.execute(text("""
|
|
SELECT c.id, c.name, c.description, c.reboot_strategy, c.is_active, c.created_at,
|
|
COUNT(s.id) AS server_count
|
|
FROM server_clusters c
|
|
LEFT JOIN servers s ON s.cluster_id = c.id
|
|
GROUP BY c.id, c.name, c.description, c.reboot_strategy, c.is_active, c.created_at
|
|
ORDER BY c.name
|
|
""")).fetchall()
|
|
|
|
# Filtrer les sections visibles selon le role
|
|
visible = {s: s in SECTION_ACCESS and role in SECTION_ACCESS[s]["visible"] for s in SECTIONS}
|
|
editable = {s: s in SECTION_ACCESS and role in SECTION_ACCESS[s]["editable"] for s in SECTIONS}
|
|
|
|
return {
|
|
"user": user, "app_name": APP_NAME, "role": role,
|
|
"sections": SECTIONS, "vals": _load_section_values(db),
|
|
"allowed_nets": allowed_nets,
|
|
"q_tags": q_tags, "q_assets": q_assets, "q_linked": q_linked,
|
|
"vcenters": vcenters, "saved": saved,
|
|
"teams_channels": teams_channels,
|
|
"teams_rules": teams_rules,
|
|
"contacts_list": contacts_list,
|
|
"server_clusters": server_clusters,
|
|
"visible": visible, "editable": editable,
|
|
}
|
|
|
|
|
|
@router.get("/settings", response_class=HTMLResponse)
|
|
async def settings_page(request: Request, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_view(perms, "settings"):
|
|
return RedirectResponse(url="/dashboard")
|
|
ctx = _build_context(db, user)
|
|
ctx["request"] = request
|
|
return templates.TemplateResponse("settings.html", ctx)
|
|
|
|
|
|
@router.post("/settings/{section}", response_class=HTMLResponse)
|
|
async def settings_save(request: Request, section: str, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
if section not in SECTIONS:
|
|
return HTMLResponse("<p>Section inconnue</p>", status_code=400)
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return RedirectResponse(url="/settings")
|
|
role = user.get("role", "viewer")
|
|
if section in SECTION_ACCESS and role not in SECTION_ACCESS[section]["editable"]:
|
|
return RedirectResponse(url="/settings")
|
|
|
|
form = await request.form()
|
|
for key, label, is_secret in SECTIONS[section]:
|
|
val = form.get(key, "")
|
|
if is_secret and val == "********":
|
|
continue
|
|
# Checkbox: si absent du form = "false"
|
|
if key.endswith("_bypass_proxy") or key.endswith("_verify_ssl"):
|
|
val = "true" if val else "false"
|
|
set_secret(db, key, val, label)
|
|
elif val:
|
|
set_secret(db, key, val, label)
|
|
|
|
ctx = _build_context(db, user, saved=section)
|
|
ctx["request"] = request
|
|
return templates.TemplateResponse("settings.html", ctx)
|
|
|
|
|
|
@router.post("/settings/smtp/test")
|
|
async def settings_smtp_test(request: Request, db=Depends(get_db),
|
|
recipient: str = Form("kalid.moutaouakil@gmail.com")):
|
|
"""Envoie un mail test à `recipient` pour valider la config SMTP."""
|
|
from fastapi.responses import JSONResponse
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
to = (recipient or "").strip()
|
|
if not to:
|
|
return JSONResponse({"ok": False, "msg": "Destinataire vide"}, status_code=400)
|
|
|
|
sender = user.get("sub") or user.get("username") or "PatchCenter"
|
|
from datetime import datetime as _dt
|
|
now_str = _dt.now().strftime("%d/%m/%Y %H:%M:%S")
|
|
|
|
subject = f"[PatchCenter] Test SMTP — {now_str}"
|
|
html = f"""<!DOCTYPE html><html><body style="font-family:'Segoe UI',Arial,sans-serif;background:#f3f4f6;padding:24px;">
|
|
<table role="presentation" width="600" cellspacing="0" cellpadding="0" style="background:#fff;border-radius:8px;margin:0 auto;box-shadow:0 1px 3px rgba(0,0,0,.08);">
|
|
<tr><td style="background:linear-gradient(90deg,#1e3a8a 0%,#1e40af 100%);padding:20px 28px;color:#fff;border-radius:8px 8px 0 0;">
|
|
<div style="font-size:11px;letter-spacing:0.15em;text-transform:uppercase;opacity:0.85;">SANEF — SecOps · PatchCenter</div>
|
|
<h1 style="margin:6px 0 0;font-size:20px;font-weight:600;">✅ Test SMTP réussi</h1>
|
|
</td></tr>
|
|
<tr><td style="padding:24px 28px;color:#1f2937;font-size:14px;line-height:1.6;">
|
|
<p>Bonjour,</p>
|
|
<p>Ce mail confirme que la configuration SMTP de <strong>PatchCenter</strong> fonctionne correctement.</p>
|
|
<table role="presentation" width="100%" cellspacing="0" cellpadding="0"
|
|
style="border-left:4px solid #2563eb;background:#eff6ff;border-radius:4px;margin:16px 0;">
|
|
<tr><td style="padding:14px 18px;">
|
|
<table role="presentation" width="100%" cellspacing="0" cellpadding="0" style="font-size:13px;">
|
|
<tr><td style="padding:3px 0;color:#6b7280;font-weight:600;width:40%;">Envoyé par :</td>
|
|
<td style="padding:3px 0;color:#1f2937;">{sender}</td></tr>
|
|
<tr><td style="padding:3px 0;color:#6b7280;font-weight:600;">Date d'envoi :</td>
|
|
<td style="padding:3px 0;color:#1f2937;">{now_str}</td></tr>
|
|
<tr><td style="padding:3px 0;color:#6b7280;font-weight:600;">Destinataire :</td>
|
|
<td style="padding:3px 0;color:#1f2937;">{to}</td></tr>
|
|
</table>
|
|
</td></tr>
|
|
</table>
|
|
<p style="color:#6b7280;font-size:13px;">Tu peux maintenant utiliser le bouton "Prévenance PCT" sur la page d'import patching.</p>
|
|
<p style="margin:24px 0 0;color:#6b7280;font-size:13px;">Cordialement,<br>L'équipe SecOps SANEF</p>
|
|
</td></tr>
|
|
<tr><td style="background:#f9fafb;padding:12px 28px;border-top:1px solid #e5e7eb;color:#9ca3af;font-size:11px;border-radius:0 0 8px 8px;">
|
|
Mail de test généré par <strong>PatchCenter</strong> via <code>/settings/smtp/test</code>.
|
|
</td></tr>
|
|
</table></body></html>"""
|
|
|
|
from ..services.mail_service import send_html_mail
|
|
res = send_html_mail(db, to=[to], subject=subject, html=html)
|
|
return JSONResponse(res)
|
|
|
|
|
|
@router.post("/settings/ldap/test")
|
|
async def settings_ldap_test(request: Request, db=Depends(get_db)):
|
|
"""Teste la connexion LDAP avec le compte de bind."""
|
|
from fastapi.responses import JSONResponse
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
from ..services.ldap_service import test_connection
|
|
return JSONResponse(test_connection(db))
|
|
|
|
|
|
# --- vCenter CRUD ---
|
|
|
|
@router.post("/settings/vcenter/add", response_class=HTMLResponse)
|
|
async def vcenter_add(request: Request, db=Depends(get_db),
|
|
vc_name: str = Form(...), vc_endpoint: str = Form(...),
|
|
vc_datacenter: str = Form(""), vc_description: str = Form(""),
|
|
vc_responsable: str = Form("")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return RedirectResponse(url="/settings")
|
|
db.execute(text(
|
|
"INSERT INTO vcenters (name, endpoint, datacenter, description, responsable) VALUES (:n, :e, :dc, :desc, :resp)"
|
|
), {"n": vc_name, "e": vc_endpoint, "dc": vc_datacenter or None, "desc": vc_description or None, "resp": vc_responsable or None})
|
|
db.commit()
|
|
ctx = _build_context(db, user, saved="vsphere")
|
|
ctx["request"] = request
|
|
return templates.TemplateResponse("settings.html", ctx)
|
|
|
|
|
|
@router.post("/settings/vcenter/{vc_id}/delete", response_class=HTMLResponse)
|
|
async def vcenter_delete(request: Request, vc_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return RedirectResponse(url="/settings")
|
|
db.execute(text("UPDATE vcenters SET is_active = false WHERE id = :id"), {"id": vc_id})
|
|
db.commit()
|
|
ctx = _build_context(db, user, saved="vsphere")
|
|
ctx["request"] = request
|
|
return templates.TemplateResponse("settings.html", ctx)
|
|
|
|
|
|
# --- Teams channels CRUD ---
|
|
|
|
@router.post("/settings/teams-channel/add", response_class=HTMLResponse)
|
|
async def teams_channel_add(request: Request, db=Depends(get_db),
|
|
tc_name: str = Form(...),
|
|
tc_mode: str = Form("sharepoint"),
|
|
tc_sp_route: str = Form(""),
|
|
tc_webhook_url: str = Form(""),
|
|
tc_description: str = Form(""),
|
|
tc_is_default: str = Form(""),
|
|
tc_is_reboot_channel: str = Form(""),
|
|
tc_is_dynamic_dm: str = Form("")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return RedirectResponse(url="/settings")
|
|
mode = tc_mode if tc_mode in ("sharepoint", "webhook") else "sharepoint"
|
|
is_def = bool(tc_is_default)
|
|
is_reboot = bool(tc_is_reboot_channel)
|
|
is_dyn_dm = bool(tc_is_dynamic_dm)
|
|
if is_def:
|
|
db.execute(text("UPDATE teams_channels SET is_default=false WHERE is_default=true"))
|
|
if is_reboot:
|
|
db.execute(text("UPDATE teams_channels SET is_reboot_channel=false WHERE is_reboot_channel=true"))
|
|
db.execute(text("""
|
|
INSERT INTO teams_channels (name, mode, sp_route, webhook_url, description,
|
|
is_default, is_reboot_channel, is_dynamic_dm)
|
|
VALUES (:n, :m, :sp, :u, :d, :df, :rb, :dyn)
|
|
"""), {"n": tc_name.strip(), "m": mode,
|
|
"sp": (tc_sp_route.strip() or None),
|
|
"u": (tc_webhook_url.strip() or None),
|
|
"d": (tc_description or None),
|
|
"df": is_def, "rb": is_reboot, "dyn": is_dyn_dm})
|
|
db.commit()
|
|
ctx = _build_context(db, user, saved="teams")
|
|
ctx["request"] = request
|
|
return templates.TemplateResponse("settings.html", ctx)
|
|
|
|
|
|
@router.post("/settings/teams-channel/{tc_id}/edit", response_class=HTMLResponse)
|
|
async def teams_channel_edit(request: Request, tc_id: int, db=Depends(get_db),
|
|
tc_name: str = Form(...),
|
|
tc_mode: str = Form("sharepoint"),
|
|
tc_sp_route: str = Form(""),
|
|
tc_webhook_url: str = Form(""),
|
|
tc_description: str = Form(""),
|
|
tc_is_active: str = Form(""),
|
|
tc_is_default: str = Form(""),
|
|
tc_is_reboot_channel: str = Form(""),
|
|
tc_is_dynamic_dm: str = Form("")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return RedirectResponse(url="/settings")
|
|
mode = tc_mode if tc_mode in ("sharepoint", "webhook") else "sharepoint"
|
|
is_def = bool(tc_is_default)
|
|
is_act = bool(tc_is_active)
|
|
is_reboot = bool(tc_is_reboot_channel)
|
|
is_dyn_dm = bool(tc_is_dynamic_dm)
|
|
if is_def:
|
|
db.execute(text("UPDATE teams_channels SET is_default=false WHERE is_default=true AND id<>:id"),
|
|
{"id": tc_id})
|
|
if is_reboot:
|
|
db.execute(text("UPDATE teams_channels SET is_reboot_channel=false WHERE is_reboot_channel=true AND id<>:id"),
|
|
{"id": tc_id})
|
|
db.execute(text("""
|
|
UPDATE teams_channels
|
|
SET name=:n, mode=:m, sp_route=:sp, webhook_url=:u, description=:d,
|
|
is_default=:df, is_active=:ia,
|
|
is_reboot_channel=:rb, is_dynamic_dm=:dyn
|
|
WHERE id=:id
|
|
"""), {"n": tc_name.strip(), "m": mode,
|
|
"sp": (tc_sp_route.strip() or None),
|
|
"u": (tc_webhook_url.strip() or None),
|
|
"d": (tc_description or None),
|
|
"df": is_def, "ia": is_act,
|
|
"rb": is_reboot, "dyn": is_dyn_dm, "id": tc_id})
|
|
db.commit()
|
|
ctx = _build_context(db, user, saved="teams")
|
|
ctx["request"] = request
|
|
return templates.TemplateResponse("settings.html", ctx)
|
|
|
|
|
|
@router.post("/settings/teams-channel/{tc_id}/delete", response_class=HTMLResponse)
|
|
async def teams_channel_delete(request: Request, tc_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return RedirectResponse(url="/settings")
|
|
db.execute(text("DELETE FROM teams_channels WHERE id=:id"), {"id": tc_id})
|
|
db.commit()
|
|
ctx = _build_context(db, user, saved="teams")
|
|
ctx["request"] = request
|
|
return templates.TemplateResponse("settings.html", ctx)
|
|
|
|
|
|
@router.post("/settings/teams-channel/{tc_id}/test")
|
|
async def teams_channel_test(request: Request, tc_id: int, db=Depends(get_db)):
|
|
"""Test du canal selon son mode :
|
|
- mode='sharepoint' : écrit un fichier de test dans <sharepoint_notif_path>/<sp_route>/
|
|
- mode='webhook' : POST direct sur webhook_url (Adaptive Card)"""
|
|
from fastapi.responses import JSONResponse
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
row = db.execute(text("""
|
|
SELECT id, name, mode, sp_route, webhook_url, is_dynamic_dm
|
|
FROM teams_channels WHERE id=:id
|
|
"""), {"id": tc_id}).fetchone()
|
|
if not row:
|
|
return JSONResponse({"ok": False, "msg": "Canal introuvable"}, status_code=404)
|
|
sender = user.get("username") or "PatchCenter"
|
|
try:
|
|
if row.mode == "sharepoint":
|
|
from ..services.teams_service import write_sharepoint_notification
|
|
sp_base = (get_secret(db, "sharepoint_notif_path") or "").strip()
|
|
if not sp_base:
|
|
return JSONResponse({"ok": False,
|
|
"msg": "sharepoint_notif_path non configuré (Settings > Teams > Notifications)"},
|
|
status_code=400)
|
|
if not row.sp_route:
|
|
return JSONResponse({"ok": False, "msg": "sp_route non défini sur ce canal"},
|
|
status_code=400)
|
|
dyn_email = None
|
|
if row.is_dynamic_dm:
|
|
# Test mode : on met l'email du user lui-même (s'il est connu)
|
|
u_row = db.execute(text("""
|
|
SELECT email FROM users WHERE username=:u OR email=:u LIMIT 1
|
|
"""), {"u": sender}).fetchone()
|
|
dyn_email = (u_row.email if u_row else "test.user@example.com")
|
|
result = write_sharepoint_notification(
|
|
sp_base=sp_base, sp_route=row.sp_route, msg_type="debut",
|
|
server_name=f"TEST-PATCHCENTER-{row.name}", intervenant=sender,
|
|
dynamic_to_email=dyn_email,
|
|
)
|
|
return JSONResponse(result)
|
|
elif row.mode == "webhook":
|
|
if not row.webhook_url:
|
|
return JSONResponse({"ok": False, "msg": "webhook_url non défini"}, status_code=400)
|
|
from ..services.teams_service import send_test_message
|
|
result = send_test_message(row.webhook_url, row.name, sender)
|
|
return JSONResponse(result)
|
|
else:
|
|
return JSONResponse({"ok": False, "msg": f"Mode inconnu: {row.mode}"}, status_code=400)
|
|
except Exception as e:
|
|
return JSONResponse({"ok": False, "msg": f"Erreur: {e}"}, status_code=500)
|
|
|
|
|
|
# --- Teams channel rules CRUD ---
|
|
|
|
def _parse_csv_text_array(s: str):
|
|
"""'a, b ,c' -> ['a','b','c'] ; vide -> None."""
|
|
if not s:
|
|
return None
|
|
items = [p.strip() for p in s.split(",")]
|
|
items = [p for p in items if p]
|
|
return items or None
|
|
|
|
|
|
def _parse_db_filter(s: str):
|
|
"""'true'/'1'/'yes' -> True ; 'false'/'0'/'no' -> False ; '' -> None (pas filtré)."""
|
|
if s is None: return None
|
|
s = s.strip().lower()
|
|
if s in ("", "any", "null", "none", "-"): return None
|
|
if s in ("true", "1", "yes", "y", "oui", "db"): return True
|
|
if s in ("false", "0", "no", "n", "non", "nondb", "non-db"): return False
|
|
return None
|
|
|
|
|
|
@router.post("/settings/teams-rule/add", response_class=HTMLResponse)
|
|
async def teams_rule_add(request: Request, db=Depends(get_db),
|
|
tr_priority: int = Form(100),
|
|
tr_name: str = Form(...),
|
|
tr_channel_id: int = Form(...),
|
|
tr_match_responsable_contact_id: str = Form(""),
|
|
tr_match_application_domain: str = Form(""),
|
|
tr_match_env_in: str = Form(""),
|
|
tr_match_msg_type_in: str = Form(""),
|
|
tr_match_hostname_pattern: str = Form(""),
|
|
tr_match_is_database_server: str = Form("")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return RedirectResponse(url="/settings")
|
|
resp_id = int(tr_match_responsable_contact_id) if tr_match_responsable_contact_id.strip() else None
|
|
db.execute(text("""
|
|
INSERT INTO teams_channel_rules
|
|
(priority, name, channel_id,
|
|
match_responsable_contact_id, match_application_domain,
|
|
match_env_in, match_msg_type_in, match_hostname_pattern,
|
|
match_is_database_server)
|
|
VALUES (:p, :n, :ch, :rc, :dom, :env, :mt, :host, :db)
|
|
"""), {
|
|
"p": tr_priority, "n": tr_name.strip(), "ch": tr_channel_id,
|
|
"rc": resp_id,
|
|
"dom": (tr_match_application_domain.strip() or None),
|
|
"env": _parse_csv_text_array(tr_match_env_in),
|
|
"mt": _parse_csv_text_array(tr_match_msg_type_in),
|
|
"host": (tr_match_hostname_pattern.strip() or None),
|
|
"db": _parse_db_filter(tr_match_is_database_server),
|
|
})
|
|
db.commit()
|
|
ctx = _build_context(db, user, saved="teams")
|
|
ctx["request"] = request
|
|
return templates.TemplateResponse("settings.html", ctx)
|
|
|
|
|
|
@router.post("/settings/teams-rule/{tr_id}/edit", response_class=HTMLResponse)
|
|
async def teams_rule_edit(request: Request, tr_id: int, db=Depends(get_db),
|
|
tr_priority: int = Form(100),
|
|
tr_name: str = Form(...),
|
|
tr_channel_id: int = Form(...),
|
|
tr_match_responsable_contact_id: str = Form(""),
|
|
tr_match_application_domain: str = Form(""),
|
|
tr_match_env_in: str = Form(""),
|
|
tr_match_msg_type_in: str = Form(""),
|
|
tr_match_hostname_pattern: str = Form(""),
|
|
tr_match_is_database_server: str = Form(""),
|
|
tr_is_active: str = Form("")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return RedirectResponse(url="/settings")
|
|
resp_id = int(tr_match_responsable_contact_id) if tr_match_responsable_contact_id.strip() else None
|
|
db.execute(text("""
|
|
UPDATE teams_channel_rules
|
|
SET priority=:p, name=:n, channel_id=:ch,
|
|
match_responsable_contact_id=:rc,
|
|
match_application_domain=:dom,
|
|
match_env_in=:env, match_msg_type_in=:mt,
|
|
match_hostname_pattern=:host,
|
|
match_is_database_server=:db,
|
|
is_active=:ia
|
|
WHERE id=:id
|
|
"""), {
|
|
"p": tr_priority, "n": tr_name.strip(), "ch": tr_channel_id,
|
|
"rc": resp_id,
|
|
"dom": (tr_match_application_domain.strip() or None),
|
|
"env": _parse_csv_text_array(tr_match_env_in),
|
|
"mt": _parse_csv_text_array(tr_match_msg_type_in),
|
|
"host": (tr_match_hostname_pattern.strip() or None),
|
|
"db": _parse_db_filter(tr_match_is_database_server),
|
|
"ia": bool(tr_is_active), "id": tr_id,
|
|
})
|
|
db.commit()
|
|
ctx = _build_context(db, user, saved="teams")
|
|
ctx["request"] = request
|
|
return templates.TemplateResponse("settings.html", ctx)
|
|
|
|
|
|
@router.post("/settings/teams-rule/{tr_id}/delete", response_class=HTMLResponse)
|
|
async def teams_rule_delete(request: Request, tr_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return RedirectResponse(url="/settings")
|
|
db.execute(text("DELETE FROM teams_channel_rules WHERE id=:id"), {"id": tr_id})
|
|
db.commit()
|
|
ctx = _build_context(db, user, saved="teams")
|
|
ctx["request"] = request
|
|
return templates.TemplateResponse("settings.html", ctx)
|
|
|
|
|
|
@router.post("/settings/teams-rule/test")
|
|
async def teams_rule_test(request: Request, db=Depends(get_db),
|
|
server_id: int = Form(...),
|
|
msg_type: str = Form("debut")):
|
|
"""Teste la résolution sans envoyer : retourne la liste des canaux qui seraient
|
|
notifiés pour (server_id, msg_type), avec la source (rule:<name> / reboot / default)."""
|
|
from fastapi.responses import JSONResponse
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not can_view(perms, "settings"):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
from ..services.teams_service import resolve_channels_for_server
|
|
result = resolve_channels_for_server(db, server_id, msg_type)
|
|
return JSONResponse(result)
|
|
|
|
|
|
# --- Server clusters CRUD ---
|
|
|
|
@router.post("/settings/server-cluster/add", response_class=HTMLResponse)
|
|
async def server_cluster_add(request: Request, db=Depends(get_db),
|
|
sc_name: str = Form(...),
|
|
sc_description: str = Form(""),
|
|
sc_reboot_strategy: str = Form("sequential")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return RedirectResponse(url="/settings")
|
|
strat = sc_reboot_strategy if sc_reboot_strategy in ("sequential", "parallel") else "sequential"
|
|
try:
|
|
db.execute(text("""
|
|
INSERT INTO server_clusters (name, description, reboot_strategy)
|
|
VALUES (:n, :d, :s)
|
|
"""), {"n": sc_name.strip(), "d": (sc_description or None), "s": strat})
|
|
db.commit()
|
|
except Exception as e:
|
|
db.rollback()
|
|
# Probablement violation unique sur name
|
|
logger.warning(f"server_cluster_add failed: {e}")
|
|
ctx = _build_context(db, user, saved="cluster")
|
|
ctx["request"] = request
|
|
return templates.TemplateResponse("settings.html", ctx)
|
|
|
|
|
|
@router.post("/settings/server-cluster/{sc_id}/edit", response_class=HTMLResponse)
|
|
async def server_cluster_edit(request: Request, sc_id: int, db=Depends(get_db),
|
|
sc_name: str = Form(...),
|
|
sc_description: str = Form(""),
|
|
sc_reboot_strategy: str = Form("sequential"),
|
|
sc_is_active: str = Form("")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return RedirectResponse(url="/settings")
|
|
strat = sc_reboot_strategy if sc_reboot_strategy in ("sequential", "parallel") else "sequential"
|
|
db.execute(text("""
|
|
UPDATE server_clusters
|
|
SET name=:n, description=:d, reboot_strategy=:s, is_active=:ia
|
|
WHERE id=:id
|
|
"""), {"n": sc_name.strip(), "d": (sc_description or None), "s": strat,
|
|
"ia": bool(sc_is_active), "id": sc_id})
|
|
db.commit()
|
|
ctx = _build_context(db, user, saved="cluster")
|
|
ctx["request"] = request
|
|
return templates.TemplateResponse("settings.html", ctx)
|
|
|
|
|
|
@router.post("/settings/server-cluster/{sc_id}/delete", response_class=HTMLResponse)
|
|
async def server_cluster_delete(request: Request, sc_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return RedirectResponse(url="/settings")
|
|
# Check qu'aucun serveur n'y est encore rattaché
|
|
cnt = db.execute(text("SELECT COUNT(*) FROM servers WHERE cluster_id=:id"),
|
|
{"id": sc_id}).scalar() or 0
|
|
if cnt > 0:
|
|
# Désactivation seule (ne pas casser les FK et l'historique)
|
|
db.execute(text("UPDATE server_clusters SET is_active=false WHERE id=:id"), {"id": sc_id})
|
|
else:
|
|
db.execute(text("DELETE FROM server_clusters WHERE id=:id"), {"id": sc_id})
|
|
db.commit()
|
|
ctx = _build_context(db, user, saved="cluster")
|
|
ctx["request"] = request
|
|
return templates.TemplateResponse("settings.html", ctx)
|
|
|
|
|
|
@router.get("/settings/server-cluster/{sc_id}/servers")
|
|
async def server_cluster_servers(request: Request, sc_id: int, db=Depends(get_db)):
|
|
"""Liste des serveurs du cluster avec leur cluster_order — JSON pour panneau détail."""
|
|
from fastapi.responses import JSONResponse
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not can_view(perms, "settings"):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
rows = db.execute(text("""
|
|
SELECT id, hostname, cluster_order, machine_type, etat
|
|
FROM servers
|
|
WHERE cluster_id = :id
|
|
ORDER BY cluster_order NULLS LAST, hostname
|
|
"""), {"id": sc_id}).fetchall()
|
|
return JSONResponse({"ok": True, "count": len(rows),
|
|
"servers": [{"id": r.id, "hostname": str(r.hostname),
|
|
"order": r.cluster_order,
|
|
"machine_type": r.machine_type,
|
|
"etat": r.etat} for r in rows]})
|
|
|
|
|
|
# --- Secret individuel ---
|
|
|
|
@router.post("/settings/secret/update", response_class=HTMLResponse)
|
|
async def secret_update(request: Request, db=Depends(get_db),
|
|
secret_key: str = Form(...), secret_value: str = Form(...)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return RedirectResponse(url="/settings")
|
|
if secret_value and secret_value != "********":
|
|
# Recuperer la description existante
|
|
existing = db.execute(text("SELECT description FROM app_secrets WHERE key = :k"),
|
|
{"k": secret_key}).fetchone()
|
|
desc = existing.description if existing else secret_key
|
|
set_secret(db, secret_key, secret_value, desc)
|
|
ctx = _build_context(db, user, saved="secret")
|
|
ctx["request"] = request
|
|
return templates.TemplateResponse("settings.html", ctx)
|
|
|
|
|
|
# --- Réseaux autorisés ---
|
|
|
|
def _regen_nginx_acl(db):
|
|
"""Régénère le fichier ACL nginx depuis la base"""
|
|
nets = db.execute(text("SELECT cidr FROM allowed_networks WHERE is_active = true ORDER BY cidr")).fetchall()
|
|
lines = ["# Généré par PatchCenter — ne pas éditer manuellement"]
|
|
for n in nets:
|
|
lines.append(f"allow {n.cidr};")
|
|
lines.append("deny all;")
|
|
try:
|
|
with open("/etc/nginx/patchcenter_acl.conf", "w") as f:
|
|
f.write("\n".join(lines) + "\n")
|
|
import subprocess
|
|
subprocess.run(["nginx", "-t"], capture_output=True, check=True)
|
|
subprocess.run(["systemctl", "reload", "nginx"], capture_output=True)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
@router.post("/settings/network/add")
|
|
async def network_add(request: Request, db=Depends(get_db),
|
|
cidr: str = Form(...), description: str = Form("")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return RedirectResponse(url="/settings")
|
|
db.execute(text("INSERT INTO allowed_networks (cidr, description) VALUES (:c, :d)"),
|
|
{"c": cidr.strip(), "d": description or None})
|
|
db.commit()
|
|
_regen_nginx_acl(db)
|
|
ctx = _build_context(db, user, saved="security")
|
|
ctx["request"] = request
|
|
return templates.TemplateResponse("settings.html", ctx)
|
|
|
|
|
|
@router.post("/settings/network/{net_id}/delete")
|
|
async def network_delete(request: Request, net_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return RedirectResponse(url="/settings")
|
|
db.execute(text("DELETE FROM allowed_networks WHERE id = :id"), {"id": net_id})
|
|
db.commit()
|
|
_regen_nginx_acl(db)
|
|
ctx = _build_context(db, user, saved="security")
|
|
ctx["request"] = request
|
|
return templates.TemplateResponse("settings.html", ctx)
|
|
|
|
|
|
@router.post("/settings/network/{net_id}/toggle")
|
|
async def network_toggle(request: Request, net_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "settings"):
|
|
return RedirectResponse(url="/settings")
|
|
db.execute(text("UPDATE allowed_networks SET is_active = NOT is_active WHERE id = :id"), {"id": net_id})
|
|
db.commit()
|
|
_regen_nginx_acl(db)
|
|
ctx = _build_context(db, user, saved="security")
|
|
ctx["request"] = request
|
|
return templates.TemplateResponse("settings.html", ctx)
|