patchcenter/app/services/teams_service.py
Admin MPCZ 29f6153370 feat(pct): workflow prevenance PCT (auto-detection + gate confirmation + suffixe Teams)
- Migration migrate_pct_workflow_20260507.sql: ajoute patch_planning_import_rows
  pct_required (boolean default false), pct_confirmed_at (timestamptz),
  pct_confirmed_by_user_id (FK users). Backfill depuis servers.pct_required.
- Auto-detection a l'import (planning_import.py): scan referent_technique +
  mode_operatoire + impacts + commentaire pour pattern \bPCT\b mot entier
  (insensible casse) -> pct_required=true sur la row. Propage egalement vers
  servers.pct_required si pas deja true.
- UI iexec: badge orange '⚠ Prév PCT à faire' sur la cellule asset_name si
  pct_required=true et pas confirme, badge vert ' PCT ok' une fois confirme.
- Gate avant Step 3 (PATCH REEL): scan des serveurs cibles, si certains ont
  pct_required && !pct_confirmed -> 2 confirmations successives + appel
  POST /patching/iexec/confirm-pct qui marque pct_confirmed_at + user_id.
  Ne lance pas le patch si l'operateur annule.
- Endpoint POST /patching/iexec/confirm-pct: marque les rows comme PCT confirmes
  (pct_confirmed_at = now(), pct_confirmed_by_user_id = current user).
- Notif Teams: send_notification accepte planning_row_id optionnel ; si la row
  a pct_required && pct_confirmed, le message debut/fin est suffixe par
  ' (Prévenance PCT ok)' pour informer le responsable que l'amont a ete gere.
2026-05-07 08:19:19 +02:00

535 lines
24 KiB
Python

"""Service Microsoft Teams — deux modes d'envoi :
1. **mode='sharepoint'** (recommandé pour SANEF) — écriture d'un fichier .txt dans un
sous-dossier OneDrive synchronisé. Un Workflow Power Automate côté SharePoint
surveille le dossier et poste sur Teams. Aucun OAuth requis. Format identique au
.exe Sanef Patch Manager pour réutiliser les Workflows existants.
2. **mode='webhook'** (futur, OAuth requis sur tenant SANEF) — POST direct sur
l'URL Power Automate Direct API avec un Bearer token Entra ID.
Le routage serveur → canal/conversation est géré par la table `teams_channel_rules`
(rules-based) et résolu par `resolve_channel_for_server`.
Mode 'sharepoint' avec is_dynamic_dm=true : le fichier est préfixé par une ligne
`TO: <email>` que le Workflow PA lit pour poster dynamiquement à un destinataire,
sans avoir à créer un workflow par responsable.
"""
import json
import logging
import os
import threading
from datetime import datetime
from typing import Dict, Any, Optional, List
import requests
log = logging.getLogger("patchcenter.teams")
HTTP_TIMEOUT = 10
PROXY_URL = None # à override via Settings si besoin
# ───────────────────────────────────────────────────────────────
# MODE SHAREPOINT — écriture fichier dans dossier OneDrive sync
# ───────────────────────────────────────────────────────────────
def format_message_text(server_name: str, msg_type: str, intervenant: str,
pct_confirmed: bool = False) -> str:
"""Formate le message texte plat (compatible workflow PA → Teams).
Format identique au .exe Sanef Patch Manager pour réutiliser les workflows
existants côté SharePoint.
Si pct_confirmed=True (le serveur nécessitait une prévenance PCT et elle a
été confirmée par l'opérateur avant le patching), un suffixe " (Prévenance PCT ok)"
est ajouté pour signaler au responsable que l'amont a été géré."""
nom = (intervenant or "SecOps")
nom = nom[:1].upper() + nom[1:] if nom else "SecOps"
suffix = " (Prévenance PCT ok)" if pct_confirmed else ""
if msg_type == "debut":
return f"[SECOPS] : Intervenant({nom}) => Debut d'intervention sur {server_name}{suffix}"
if msg_type == "reboot":
return f"[SECOPS] : Intervenant({nom}) => Reboot suite MAJ de {server_name}"
if msg_type == "fin":
return f"[SECOPS] : Intervenant({nom}) => Fin d'intervention sur {server_name}{suffix}"
if msg_type == "annulation":
return f"[SECOPS] : Intervenant({nom}) => Annulation intervention sur {server_name}"
return f"[SECOPS] : Intervenant({nom}) => {msg_type} {server_name}"
def write_sharepoint_notification(sp_base: str, sp_route: str, msg_type: str,
server_name: str, intervenant: str,
dynamic_to_email: Optional[str] = None,
pct_confirmed: bool = False) -> Dict[str, Any]:
"""Écrit un fichier .txt dans <sp_base>/<sp_route>/.
Format nom de fichier : <msg_type>_<server>_<YYYYMMDD_HHMMSS>.txt
Format contenu :
- Si dynamic_to_email : 1ère ligne `TO: <email>`, puis le message
- Sinon : juste le message texte plat
Si pct_confirmed=True, le message inclut le suffixe "(Prévenance PCT ok)".
Le sous-dossier est créé s'il n'existe pas.
Retourne {ok, path, detail} ou {ok: False, msg}."""
if not sp_base or not sp_route:
return {"ok": False, "msg": "sharepoint_notif_path ou sp_route manquant"}
body = format_message_text(server_name, msg_type, intervenant,
pct_confirmed=pct_confirmed)
if not body:
return {"ok": False, "msg": f"msg_type inconnu: {msg_type}"}
if dynamic_to_email:
content = f"TO: {dynamic_to_email}\n{body}\n"
else:
content = body + "\n"
try:
notif_dir = os.path.join(sp_base, sp_route)
os.makedirs(notif_dir, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
# Nettoyage hostname pour nom fichier
safe_host = "".join(c if c.isalnum() or c in ("-", "_", ".") else "_"
for c in (server_name or "unknown"))[:80]
fname = f"{msg_type}_{safe_host}_{ts}.txt"
fpath = os.path.join(notif_dir, fname)
with open(fpath, "w", encoding="utf-8") as f:
f.write(content)
log.info("Teams SP notif écrite: %s", fpath)
return {"ok": True, "path": fpath, "detail": "Fichier écrit, en attente sync OneDrive"}
except Exception as e:
log.error("Échec écriture SP notif: %s", e)
return {"ok": False, "msg": f"Erreur écriture: {e}"}
def write_sharepoint_notification_async(sp_base: str, sp_route: str, msg_type: str,
server_name: str, intervenant: str,
dynamic_to_email: Optional[str] = None) -> None:
"""Variante non-bloquante : lance write_sharepoint_notification dans un thread."""
threading.Thread(
target=write_sharepoint_notification,
args=(sp_base, sp_route, msg_type, server_name, intervenant, dynamic_to_email),
daemon=True,
).start()
# ───────────────────────────────────────────────────────────────
# MODE WEBHOOK — POST direct (futur, OAuth requis)
# ───────────────────────────────────────────────────────────────
def _proxies():
if PROXY_URL:
return {"http": PROXY_URL, "https": PROXY_URL}
return None
def _post(webhook_url: str, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Poste un payload sur le webhook Teams. Retourne {ok, status, detail}."""
try:
r = requests.post(webhook_url, json=payload,
timeout=HTTP_TIMEOUT, proxies=_proxies())
ok = (200 <= r.status_code < 300)
return {
"ok": ok,
"status": r.status_code,
"detail": (r.text or "")[:500] if not ok else "Message envoyé",
}
except requests.exceptions.RequestException as e:
return {"ok": False, "status": 0, "detail": f"Réseau: {e}"}
def _adaptive_card(title: str, body_lines: list, color: str = "good") -> Dict[str, Any]:
"""Construit un payload Adaptive Card minimal (compatible Workflows).
color : 'good' (vert) | 'warning' (orange) | 'attention' (rouge) | 'default'."""
facts = []
body_blocks = [
{"type": "TextBlock", "text": title, "weight": "Bolder",
"size": "Medium", "color": color, "wrap": True}
]
for ln in body_lines:
if isinstance(ln, dict) and "title" in ln and "value" in ln:
facts.append({"title": ln["title"], "value": str(ln["value"])})
else:
body_blocks.append({"type": "TextBlock", "text": str(ln), "wrap": True,
"spacing": "Small"})
if facts:
body_blocks.append({"type": "FactSet", "facts": facts})
body_blocks.append({"type": "TextBlock",
"text": f"_{datetime.now().strftime('%Y-%m-%d %H:%M')} — PatchCenter_",
"size": "Small", "isSubtle": True, "spacing": "Medium"})
return {
"type": "message",
"attachments": [{
"contentType": "application/vnd.microsoft.card.adaptive",
"contentUrl": None,
"content": {
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"type": "AdaptiveCard",
"version": "1.4",
"body": body_blocks,
}
}]
}
def resolve_channels_for_server(db, server_id: int, msg_type: str = "debut") -> Dict[str, Any]:
"""Résout la liste des canaux Teams cibles pour un serveur donné, selon le msg_type.
Algorithme :
1. Si msg_type='reboot' → uniquement le canal `is_reboot_channel=true`
(PAS de fan-out ; les reboots vont à un seul endroit, le canal général DSI).
2. servers.teams_channel_id (override explicite) → uniquement ce canal,
pas de fan-out non plus (override = "je sais ce que je veux").
3. Sinon : **fan-out** de toutes les règles actives qui matchent.
Toutes les règles dont les conditions matchent contribuent un canal
(multi-recipient : responsable + référents techniques + DBA + …).
4. Si aucune règle n'a matché → fallback canal `is_default=true`.
Conditions de match d'une règle :
- match_msg_type_in : msg_type doit être dans le tableau (NULL = tous)
- match_responsable_contact_id : doit être == servers.responsable_domaine_contact_id
- match_application_domain : substring insensible casse dans applications.domain
- match_env_in : env du serveur dans le tableau (insensible casse)
- match_hostname_pattern : substring insensible casse dans hostname
- match_is_database_server : NULL = pas filtré ; true = ne matche que les DB ;
false = ne matche que les non-DB
Pour chaque canal `is_dynamic_dm=true` retenu, on calcule un `dynamic_to_email`
distinct selon la nature de la règle (responsable/référent/DBA pool).
Retourne {
ok: bool,
msg (si ok=False),
server: {id, hostname, ...},
channels: list[ {channel_id, name, mode, sp_route, webhook_url,
is_dynamic_dm, dynamic_to_email, source} ],
}
"""
from sqlalchemy import text as sqlt
if not server_id:
return {"ok": False, "msg": "server_id manquant", "channels": []}
def _channel_row(channel_id):
return db.execute(sqlt("""
SELECT id, name, mode, sp_route, webhook_url, is_dynamic_dm
FROM teams_channels
WHERE id = :id AND is_active = true
"""), {"id": channel_id}).fetchone()
def _contact_upn(contact_id):
if not contact_id:
return None
c = db.execute(sqlt("SELECT teams_upn FROM contacts WHERE id = :id"),
{"id": contact_id}).fetchone()
return c.teams_upn if c and c.teams_upn else None
s = db.execute(sqlt("""
SELECT s.id, s.hostname, s.environnement, s.application_id,
s.responsable_domaine_contact_id, s.referent_technique_contact_id,
s.teams_channel_id AS server_channel_id,
COALESCE(s.is_database_server, false) AS is_db,
a.domain AS app_domain
FROM servers s
LEFT JOIN applications a ON a.id = s.application_id
WHERE s.id = :sid
"""), {"sid": server_id}).fetchone()
if not s:
return {"ok": False, "msg": f"Serveur id={server_id} introuvable", "channels": []}
server_info = {
"id": s.id, "hostname": s.hostname, "environnement": s.environnement,
"domain": s.app_domain, "is_database_server": s.is_db,
"responsable_domaine_contact_id": s.responsable_domaine_contact_id,
"referent_technique_contact_id": s.referent_technique_contact_id,
}
# Référents additionnels (multi-référents)
extra_referents = db.execute(sqlt("""
SELECT contact_id FROM server_additional_referents WHERE server_id = :sid
"""), {"sid": server_id}).fetchall()
extra_ref_ids = [r.contact_id for r in extra_referents]
def _build(ch, source, dynamic_to_email=None):
return {
"channel_id": ch.id, "name": ch.name,
"mode": ch.mode, "sp_route": ch.sp_route,
"webhook_url": ch.webhook_url,
"is_dynamic_dm": ch.is_dynamic_dm,
"dynamic_to_email": dynamic_to_email,
"source": source,
}
# 1) Reboot → canal reboot uniquement
if msg_type == "reboot":
ch = db.execute(sqlt("""
SELECT id, name, mode, sp_route, webhook_url, is_dynamic_dm
FROM teams_channels
WHERE is_reboot_channel = true AND is_active = true
ORDER BY id LIMIT 1
""")).fetchone()
if ch:
dyn = _contact_upn(s.responsable_domaine_contact_id) if ch.is_dynamic_dm else None
return {"ok": True, "server": server_info,
"channels": [_build(ch, "reboot", dyn)]}
return {"ok": False, "msg": "Aucun canal reboot configuré (is_reboot_channel)",
"server": server_info, "channels": []}
# 2) Override par serveur (gagne sur tout le reste)
if s.server_channel_id:
ch = _channel_row(s.server_channel_id)
if ch:
dyn = _contact_upn(s.responsable_domaine_contact_id) if ch.is_dynamic_dm else None
return {"ok": True, "server": server_info,
"channels": [_build(ch, "server_override", dyn)]}
# 3) Fan-out toutes les règles actives qui matchent
rules = db.execute(sqlt("""
SELECT id, name, priority, channel_id,
match_responsable_contact_id, match_application_domain,
match_env_in, match_msg_type_in, match_hostname_pattern,
match_is_database_server
FROM teams_channel_rules
WHERE is_active = true
ORDER BY priority ASC, id ASC
""")).fetchall()
server_env = (s.environnement or "").strip()
server_dom = (s.app_domain or "").strip()
server_host = (s.hostname or "").lower()
matched_channels = []
seen_channel_ids = set()
for r in rules:
if r.match_msg_type_in and msg_type not in r.match_msg_type_in:
continue
if r.match_responsable_contact_id and \
s.responsable_domaine_contact_id != r.match_responsable_contact_id:
continue
if r.match_application_domain:
md = r.match_application_domain.strip().lower()
if not server_dom or md not in server_dom.lower():
continue
if r.match_env_in:
envs_lower = [e.lower() for e in r.match_env_in]
if not server_env or server_env.lower() not in envs_lower:
continue
if r.match_hostname_pattern:
pat = r.match_hostname_pattern.strip().lower()
if pat and pat not in server_host:
continue
if r.match_is_database_server is not None:
if bool(r.match_is_database_server) != bool(s.is_db):
continue
# Toutes conditions matchent
ch = _channel_row(r.channel_id)
if not ch:
continue
if ch.id in seen_channel_ids:
continue # dédoublonnage si plusieurs règles pointent sur même canal
seen_channel_ids.add(ch.id)
# Détermine le dynamic_to_email selon nature de la règle
# Heuristique : si la règle matche un responsable_contact_id → DM ce contact
# sinon si match_is_database_server=true → pool DBA (gérée côté workflow PA)
# sinon → DM le responsable_domaine du serveur
dyn = None
if ch.is_dynamic_dm:
if r.match_responsable_contact_id:
dyn = _contact_upn(r.match_responsable_contact_id)
elif r.match_is_database_server:
# Pour DB : le workflow PA est censé connaître les destinataires
# (Nadine + Cedric). On peut joindre les UPN via les référents
# additionnels du serveur si renseignés, sinon vide → workflow décide.
upns = []
if s.referent_technique_contact_id:
u = _contact_upn(s.referent_technique_contact_id)
if u: upns.append(u)
for cid in extra_ref_ids:
u = _contact_upn(cid)
if u and u not in upns: upns.append(u)
dyn = ",".join(upns) if upns else None
else:
dyn = _contact_upn(s.responsable_domaine_contact_id)
matched_channels.append(_build(ch, f"rule:{r.name}", dyn))
if matched_channels:
return {"ok": True, "server": server_info, "channels": matched_channels}
# 4) Fallback default
ch = db.execute(sqlt("""
SELECT id, name, mode, sp_route, webhook_url, is_dynamic_dm
FROM teams_channels
WHERE is_default = true AND is_active = true
ORDER BY id LIMIT 1
""")).fetchone()
if ch:
dyn = _contact_upn(s.responsable_domaine_contact_id) if ch.is_dynamic_dm else None
return {"ok": True, "server": server_info,
"channels": [_build(ch, "default", dyn)]}
return {"ok": False, "msg": "Aucun canal résolu (aucune règle ne matche, pas de canal défaut)",
"server": server_info, "channels": []}
# Compat : ancien nom singulier — retourne le 1er canal résolu (best-effort).
# Conservé pour ne pas casser d'éventuels callers externes ; à terme à supprimer.
def resolve_channel_for_server(db, server_id: int, msg_type: str = "debut") -> Dict[str, Any]:
res = resolve_channels_for_server(db, server_id, msg_type)
if not res.get("ok") or not res.get("channels"):
return {"ok": False, "msg": res.get("msg", "Aucun canal résolu")}
ch = res["channels"][0]
return {
"ok": True,
"channel_id": ch["channel_id"], "name": ch["name"],
"mode": ch["mode"], "sp_route": ch["sp_route"],
"webhook_url": ch["webhook_url"], "is_dynamic_dm": ch["is_dynamic_dm"],
"dynamic_to_email": ch["dynamic_to_email"], "source": ch["source"],
}
def send_notification(db, server_id: int, msg_type: str, intervenant: str,
planning_row_id: Optional[int] = None) -> Dict[str, Any]:
"""Orchestration haut niveau : résout les canaux et envoie 1 notif par canal.
Mode 'sharepoint' : écrit un fichier .txt par canal (fan-out multi-recipient).
Mode 'webhook' : non implémenté tant qu'OAuth pas en place.
Si planning_row_id est fourni, on lit pct_required + pct_confirmed_at sur la ligne
pour ajouter le suffixe "(Prévenance PCT ok)" au message Teams le cas échéant.
Retourne {ok, server, results: list[ {channel_name, source, ok, detail/path} ]}.
"""
from .secrets_service import get_secret
res = resolve_channels_for_server(db, server_id, msg_type)
if not res.get("ok"):
return res
from sqlalchemy import text as sqlt
server_name = db.execute(sqlt("SELECT hostname FROM servers WHERE id = :id"),
{"id": server_id}).scalar() or "unknown"
pct_confirmed_flag = False
if planning_row_id:
row = db.execute(sqlt("""
SELECT pct_required, pct_confirmed_at
FROM patch_planning_import_rows WHERE id = :id
"""), {"id": planning_row_id}).fetchone()
if row and row.pct_required and row.pct_confirmed_at:
pct_confirmed_flag = True
sp_base = (get_secret(db, "sharepoint_notif_path") or "").strip()
results = []
for ch in res["channels"]:
if ch["mode"] == "sharepoint":
if not sp_base:
results.append({
"channel_name": ch["name"], "source": ch["source"],
"ok": False, "msg": "sharepoint_notif_path non configuré dans Settings",
})
continue
out = write_sharepoint_notification(
sp_base=sp_base, sp_route=ch["sp_route"], msg_type=msg_type,
server_name=server_name, intervenant=intervenant,
dynamic_to_email=ch.get("dynamic_to_email"),
pct_confirmed=pct_confirmed_flag,
)
out["channel_name"] = ch["name"]
out["source"] = ch["source"]
results.append(out)
elif ch["mode"] == "webhook":
results.append({
"channel_name": ch["name"], "source": ch["source"],
"ok": False, "msg": "Mode webhook non implémenté (OAuth requis)",
})
else:
results.append({
"channel_name": ch["name"], "source": ch["source"],
"ok": False, "msg": f"Mode inconnu: {ch['mode']}",
})
n_ok = sum(1 for r in results if r.get("ok"))
return {
"ok": n_ok > 0,
"server": res["server"],
"results": results,
"summary": f"{n_ok}/{len(results)} canaux notifiés",
}
# ───────────────────────────────────────────────────────────────
# Compat : helper webhook (à conserver pour le bouton "Tester" dans Settings)
# ───────────────────────────────────────────────────────────────
def send_test_message(webhook_url: str, channel_name: str, sender: str) -> Dict[str, Any]:
"""Envoie un message de test pour valider la config du webhook."""
payload = _adaptive_card(
title=f"✅ Test PatchCenter — canal '{channel_name}'",
body_lines=[
"Ce message confirme que le webhook Teams est correctement configuré.",
{"title": "Envoyé par", "value": sender},
{"title": "Source", "value": "PatchCenter > Settings > Teams"},
],
color="good",
)
return _post(webhook_url, payload)
def send_intervention_start(webhook_url: str, hostname: str, application: str,
intervenant: str, planned_at: str = None) -> Dict[str, Any]:
"""Annonce le DÉBUT d'une intervention de patching."""
body = [
{"title": "Serveur", "value": hostname},
{"title": "Application", "value": application or ""},
{"title": "Intervenant", "value": intervenant or ""},
]
if planned_at:
body.append({"title": "Prévu", "value": planned_at})
payload = _adaptive_card(
title=f"🟧 Début intervention patching — {hostname}",
body_lines=body,
color="warning",
)
return _post(webhook_url, payload)
def send_intervention_end(webhook_url: str, hostname: str, application: str,
intervenant: str, status: str = "ok") -> Dict[str, Any]:
"""Annonce la FIN d'une intervention. status = 'ok' | 'ko'."""
color = "good" if status == "ok" else "attention"
icon = "" if status == "ok" else ""
payload = _adaptive_card(
title=f"{icon} Fin intervention patching — {hostname}",
body_lines=[
{"title": "Serveur", "value": hostname},
{"title": "Application", "value": application or ""},
{"title": "Intervenant", "value": intervenant or ""},
{"title": "Statut technique", "value": status.upper()},
"_(En attente de validation par le responsable applicatif)_",
],
color=color,
)
return _post(webhook_url, payload)
def send_planning_reminder(webhook_url: str, hostname: str, application: str,
jour: str, heure: str, intervenant: str) -> Dict[str, Any]:
"""Rappel de planning (envoyable du jeudi/vendredi précédent jusqu'au jour J)."""
payload = _adaptive_card(
title=f"🗓 Rappel planning patching — {hostname}",
body_lines=[
{"title": "Serveur", "value": hostname},
{"title": "Application", "value": application or ""},
{"title": "Date prévue", "value": jour or ""},
{"title": "Heure prévue", "value": heure or ""},
{"title": "Intervenant", "value": intervenant or ""},
"_Merci de confirmer la disponibilité applicative._",
],
color="default",
)
return _post(webhook_url, payload)