"""Service Microsoft Teams — deux modes d'envoi : 1. **mode='sharepoint'** (recommandé pour SANEF) — écriture d'un fichier .txt dans un sous-dossier OneDrive synchronisé. Un Workflow Power Automate côté SharePoint surveille le dossier et poste sur Teams. Aucun OAuth requis. Format identique au .exe Sanef Patch Manager pour réutiliser les Workflows existants. 2. **mode='webhook'** (futur, OAuth requis sur tenant SANEF) — POST direct sur l'URL Power Automate Direct API avec un Bearer token Entra ID. Le routage serveur → canal/conversation est géré par la table `teams_channel_rules` (rules-based) et résolu par `resolve_channel_for_server`. Mode 'sharepoint' avec is_dynamic_dm=true : le fichier est préfixé par une ligne `TO: ` que le Workflow PA lit pour poster dynamiquement à un destinataire, sans avoir à créer un workflow par responsable. """ import json import logging import os import threading from datetime import datetime from typing import Dict, Any, Optional, List import requests log = logging.getLogger("patchcenter.teams") HTTP_TIMEOUT = 10 PROXY_URL = None # à override via Settings si besoin # ─────────────────────────────────────────────────────────────── # MODE SHAREPOINT — écriture fichier dans dossier OneDrive sync # ─────────────────────────────────────────────────────────────── def format_message_text(server_name: str, msg_type: str, intervenant: str, pct_confirmed: bool = False) -> str: """Formate le message texte plat (compatible workflow PA → Teams). Format identique au .exe Sanef Patch Manager pour réutiliser les workflows existants côté SharePoint. Si pct_confirmed=True (le serveur nécessitait une prévenance PCT et elle a été confirmée par l'opérateur avant le patching), un suffixe " (Prévenance PCT ok)" est ajouté pour signaler au responsable que l'amont a été géré.""" nom = (intervenant or "SecOps") nom = nom[:1].upper() + nom[1:] if nom else "SecOps" suffix = " (Prévenance PCT ok)" if pct_confirmed else "" if msg_type == "debut": return f"[SECOPS] : Intervenant({nom}) => Debut d'intervention sur {server_name}{suffix}" if msg_type == "reboot": return f"[SECOPS] : Intervenant({nom}) => Reboot suite MAJ de {server_name}" if msg_type == "fin": return f"[SECOPS] : Intervenant({nom}) => Fin d'intervention sur {server_name}{suffix}" if msg_type == "annulation": return f"[SECOPS] : Intervenant({nom}) => Annulation intervention sur {server_name}" return f"[SECOPS] : Intervenant({nom}) => {msg_type} {server_name}" def write_sharepoint_notification(sp_base: str, sp_route: str, msg_type: str, server_name: str, intervenant: str, dynamic_to_email: Optional[str] = None, pct_confirmed: bool = False) -> Dict[str, Any]: """Écrit un fichier .txt dans //. Format nom de fichier : __.txt Format contenu : - Si dynamic_to_email : 1ère ligne `TO: `, puis le message - Sinon : juste le message texte plat Si pct_confirmed=True, le message inclut le suffixe "(Prévenance PCT ok)". Le sous-dossier est créé s'il n'existe pas. Retourne {ok, path, detail} ou {ok: False, msg}.""" if not sp_base or not sp_route: return {"ok": False, "msg": "sharepoint_notif_path ou sp_route manquant"} body = format_message_text(server_name, msg_type, intervenant, pct_confirmed=pct_confirmed) if not body: return {"ok": False, "msg": f"msg_type inconnu: {msg_type}"} if dynamic_to_email: content = f"TO: {dynamic_to_email}\n{body}\n" else: content = body + "\n" try: notif_dir = os.path.join(sp_base, sp_route) os.makedirs(notif_dir, exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") # Nettoyage hostname pour nom fichier safe_host = "".join(c if c.isalnum() or c in ("-", "_", ".") else "_" for c in (server_name or "unknown"))[:80] fname = f"{msg_type}_{safe_host}_{ts}.txt" fpath = os.path.join(notif_dir, fname) with open(fpath, "w", encoding="utf-8") as f: f.write(content) log.info("Teams SP notif écrite: %s", fpath) return {"ok": True, "path": fpath, "detail": "Fichier écrit, en attente sync OneDrive"} except Exception as e: log.error("Échec écriture SP notif: %s", e) return {"ok": False, "msg": f"Erreur écriture: {e}"} def write_sharepoint_notification_async(sp_base: str, sp_route: str, msg_type: str, server_name: str, intervenant: str, dynamic_to_email: Optional[str] = None) -> None: """Variante non-bloquante : lance write_sharepoint_notification dans un thread.""" threading.Thread( target=write_sharepoint_notification, args=(sp_base, sp_route, msg_type, server_name, intervenant, dynamic_to_email), daemon=True, ).start() # ─────────────────────────────────────────────────────────────── # MODE WEBHOOK — POST direct (futur, OAuth requis) # ─────────────────────────────────────────────────────────────── def _proxies(): if PROXY_URL: return {"http": PROXY_URL, "https": PROXY_URL} return None def _post(webhook_url: str, payload: Dict[str, Any]) -> Dict[str, Any]: """Poste un payload sur le webhook Teams. Retourne {ok, status, detail}.""" try: r = requests.post(webhook_url, json=payload, timeout=HTTP_TIMEOUT, proxies=_proxies()) ok = (200 <= r.status_code < 300) return { "ok": ok, "status": r.status_code, "detail": (r.text or "")[:500] if not ok else "Message envoyé", } except requests.exceptions.RequestException as e: return {"ok": False, "status": 0, "detail": f"Réseau: {e}"} def _adaptive_card(title: str, body_lines: list, color: str = "good") -> Dict[str, Any]: """Construit un payload Adaptive Card minimal (compatible Workflows). color : 'good' (vert) | 'warning' (orange) | 'attention' (rouge) | 'default'.""" facts = [] body_blocks = [ {"type": "TextBlock", "text": title, "weight": "Bolder", "size": "Medium", "color": color, "wrap": True} ] for ln in body_lines: if isinstance(ln, dict) and "title" in ln and "value" in ln: facts.append({"title": ln["title"], "value": str(ln["value"])}) else: body_blocks.append({"type": "TextBlock", "text": str(ln), "wrap": True, "spacing": "Small"}) if facts: body_blocks.append({"type": "FactSet", "facts": facts}) body_blocks.append({"type": "TextBlock", "text": f"_{datetime.now().strftime('%Y-%m-%d %H:%M')} — PatchCenter_", "size": "Small", "isSubtle": True, "spacing": "Medium"}) return { "type": "message", "attachments": [{ "contentType": "application/vnd.microsoft.card.adaptive", "contentUrl": None, "content": { "$schema": "http://adaptivecards.io/schemas/adaptive-card.json", "type": "AdaptiveCard", "version": "1.4", "body": body_blocks, } }] } def resolve_channels_for_server(db, server_id: int, msg_type: str = "debut") -> Dict[str, Any]: """Résout la liste des canaux Teams cibles pour un serveur donné, selon le msg_type. Algorithme : 1. Si msg_type='reboot' → uniquement le canal `is_reboot_channel=true` (PAS de fan-out ; les reboots vont à un seul endroit, le canal général DSI). 2. servers.teams_channel_id (override explicite) → uniquement ce canal, pas de fan-out non plus (override = "je sais ce que je veux"). 3. Sinon : **fan-out** de toutes les règles actives qui matchent. Toutes les règles dont les conditions matchent contribuent un canal (multi-recipient : responsable + référents techniques + DBA + …). 4. Si aucune règle n'a matché → fallback canal `is_default=true`. Conditions de match d'une règle : - match_msg_type_in : msg_type doit être dans le tableau (NULL = tous) - match_responsable_contact_id : doit être == servers.responsable_domaine_contact_id - match_application_domain : substring insensible casse dans applications.domain - match_env_in : env du serveur dans le tableau (insensible casse) - match_hostname_pattern : substring insensible casse dans hostname - match_is_database_server : NULL = pas filtré ; true = ne matche que les DB ; false = ne matche que les non-DB Pour chaque canal `is_dynamic_dm=true` retenu, on calcule un `dynamic_to_email` distinct selon la nature de la règle (responsable/référent/DBA pool). Retourne { ok: bool, msg (si ok=False), server: {id, hostname, ...}, channels: list[ {channel_id, name, mode, sp_route, webhook_url, is_dynamic_dm, dynamic_to_email, source} ], } """ from sqlalchemy import text as sqlt if not server_id: return {"ok": False, "msg": "server_id manquant", "channels": []} def _channel_row(channel_id): return db.execute(sqlt(""" SELECT id, name, mode, sp_route, webhook_url, is_dynamic_dm FROM teams_channels WHERE id = :id AND is_active = true """), {"id": channel_id}).fetchone() def _contact_upn(contact_id): if not contact_id: return None c = db.execute(sqlt("SELECT teams_upn FROM contacts WHERE id = :id"), {"id": contact_id}).fetchone() return c.teams_upn if c and c.teams_upn else None s = db.execute(sqlt(""" SELECT s.id, s.hostname, s.environnement, s.application_id, s.responsable_domaine_contact_id, s.referent_technique_contact_id, s.teams_channel_id AS server_channel_id, COALESCE(s.is_database_server, false) AS is_db, a.domain AS app_domain FROM servers s LEFT JOIN applications a ON a.id = s.application_id WHERE s.id = :sid """), {"sid": server_id}).fetchone() if not s: return {"ok": False, "msg": f"Serveur id={server_id} introuvable", "channels": []} server_info = { "id": s.id, "hostname": s.hostname, "environnement": s.environnement, "domain": s.app_domain, "is_database_server": s.is_db, "responsable_domaine_contact_id": s.responsable_domaine_contact_id, "referent_technique_contact_id": s.referent_technique_contact_id, } # Référents additionnels (multi-référents) extra_referents = db.execute(sqlt(""" SELECT contact_id FROM server_additional_referents WHERE server_id = :sid """), {"sid": server_id}).fetchall() extra_ref_ids = [r.contact_id for r in extra_referents] def _build(ch, source, dynamic_to_email=None): return { "channel_id": ch.id, "name": ch.name, "mode": ch.mode, "sp_route": ch.sp_route, "webhook_url": ch.webhook_url, "is_dynamic_dm": ch.is_dynamic_dm, "dynamic_to_email": dynamic_to_email, "source": source, } # 1) Reboot → canal reboot uniquement if msg_type == "reboot": ch = db.execute(sqlt(""" SELECT id, name, mode, sp_route, webhook_url, is_dynamic_dm FROM teams_channels WHERE is_reboot_channel = true AND is_active = true ORDER BY id LIMIT 1 """)).fetchone() if ch: dyn = _contact_upn(s.responsable_domaine_contact_id) if ch.is_dynamic_dm else None return {"ok": True, "server": server_info, "channels": [_build(ch, "reboot", dyn)]} return {"ok": False, "msg": "Aucun canal reboot configuré (is_reboot_channel)", "server": server_info, "channels": []} # 2) Override par serveur (gagne sur tout le reste) if s.server_channel_id: ch = _channel_row(s.server_channel_id) if ch: dyn = _contact_upn(s.responsable_domaine_contact_id) if ch.is_dynamic_dm else None return {"ok": True, "server": server_info, "channels": [_build(ch, "server_override", dyn)]} # 3) Fan-out toutes les règles actives qui matchent rules = db.execute(sqlt(""" SELECT id, name, priority, channel_id, match_responsable_contact_id, match_application_domain, match_env_in, match_msg_type_in, match_hostname_pattern, match_is_database_server FROM teams_channel_rules WHERE is_active = true ORDER BY priority ASC, id ASC """)).fetchall() server_env = (s.environnement or "").strip() server_dom = (s.app_domain or "").strip() server_host = (s.hostname or "").lower() matched_channels = [] seen_channel_ids = set() for r in rules: if r.match_msg_type_in and msg_type not in r.match_msg_type_in: continue if r.match_responsable_contact_id and \ s.responsable_domaine_contact_id != r.match_responsable_contact_id: continue if r.match_application_domain: md = r.match_application_domain.strip().lower() if not server_dom or md not in server_dom.lower(): continue if r.match_env_in: envs_lower = [e.lower() for e in r.match_env_in] if not server_env or server_env.lower() not in envs_lower: continue if r.match_hostname_pattern: pat = r.match_hostname_pattern.strip().lower() if pat and pat not in server_host: continue if r.match_is_database_server is not None: if bool(r.match_is_database_server) != bool(s.is_db): continue # Toutes conditions matchent ch = _channel_row(r.channel_id) if not ch: continue if ch.id in seen_channel_ids: continue # dédoublonnage si plusieurs règles pointent sur même canal seen_channel_ids.add(ch.id) # Détermine le dynamic_to_email selon nature de la règle # Heuristique : si la règle matche un responsable_contact_id → DM ce contact # sinon si match_is_database_server=true → pool DBA (gérée côté workflow PA) # sinon → DM le responsable_domaine du serveur dyn = None if ch.is_dynamic_dm: if r.match_responsable_contact_id: dyn = _contact_upn(r.match_responsable_contact_id) elif r.match_is_database_server: # Pour DB : le workflow PA est censé connaître les destinataires # (Nadine + Cedric). On peut joindre les UPN via les référents # additionnels du serveur si renseignés, sinon vide → workflow décide. upns = [] if s.referent_technique_contact_id: u = _contact_upn(s.referent_technique_contact_id) if u: upns.append(u) for cid in extra_ref_ids: u = _contact_upn(cid) if u and u not in upns: upns.append(u) dyn = ",".join(upns) if upns else None else: dyn = _contact_upn(s.responsable_domaine_contact_id) matched_channels.append(_build(ch, f"rule:{r.name}", dyn)) if matched_channels: return {"ok": True, "server": server_info, "channels": matched_channels} # 4) Fallback default ch = db.execute(sqlt(""" SELECT id, name, mode, sp_route, webhook_url, is_dynamic_dm FROM teams_channels WHERE is_default = true AND is_active = true ORDER BY id LIMIT 1 """)).fetchone() if ch: dyn = _contact_upn(s.responsable_domaine_contact_id) if ch.is_dynamic_dm else None return {"ok": True, "server": server_info, "channels": [_build(ch, "default", dyn)]} return {"ok": False, "msg": "Aucun canal résolu (aucune règle ne matche, pas de canal défaut)", "server": server_info, "channels": []} # Compat : ancien nom singulier — retourne le 1er canal résolu (best-effort). # Conservé pour ne pas casser d'éventuels callers externes ; à terme à supprimer. def resolve_channel_for_server(db, server_id: int, msg_type: str = "debut") -> Dict[str, Any]: res = resolve_channels_for_server(db, server_id, msg_type) if not res.get("ok") or not res.get("channels"): return {"ok": False, "msg": res.get("msg", "Aucun canal résolu")} ch = res["channels"][0] return { "ok": True, "channel_id": ch["channel_id"], "name": ch["name"], "mode": ch["mode"], "sp_route": ch["sp_route"], "webhook_url": ch["webhook_url"], "is_dynamic_dm": ch["is_dynamic_dm"], "dynamic_to_email": ch["dynamic_to_email"], "source": ch["source"], } def send_notification(db, server_id: int, msg_type: str, intervenant: str, planning_row_id: Optional[int] = None) -> Dict[str, Any]: """Orchestration haut niveau : résout les canaux et envoie 1 notif par canal. Mode 'sharepoint' : écrit un fichier .txt par canal (fan-out multi-recipient). Mode 'webhook' : non implémenté tant qu'OAuth pas en place. Si planning_row_id est fourni, on lit pct_required + pct_confirmed_at sur la ligne pour ajouter le suffixe "(Prévenance PCT ok)" au message Teams le cas échéant. Retourne {ok, server, results: list[ {channel_name, source, ok, detail/path} ]}. """ from .secrets_service import get_secret res = resolve_channels_for_server(db, server_id, msg_type) if not res.get("ok"): return res from sqlalchemy import text as sqlt server_name = db.execute(sqlt("SELECT hostname FROM servers WHERE id = :id"), {"id": server_id}).scalar() or "unknown" pct_confirmed_flag = False if planning_row_id: row = db.execute(sqlt(""" SELECT pct_required, pct_confirmed_at FROM patch_planning_import_rows WHERE id = :id """), {"id": planning_row_id}).fetchone() if row and row.pct_required and row.pct_confirmed_at: pct_confirmed_flag = True sp_base = (get_secret(db, "sharepoint_notif_path") or "").strip() results = [] for ch in res["channels"]: if ch["mode"] == "sharepoint": if not sp_base: results.append({ "channel_name": ch["name"], "source": ch["source"], "ok": False, "msg": "sharepoint_notif_path non configuré dans Settings", }) continue out = write_sharepoint_notification( sp_base=sp_base, sp_route=ch["sp_route"], msg_type=msg_type, server_name=server_name, intervenant=intervenant, dynamic_to_email=ch.get("dynamic_to_email"), pct_confirmed=pct_confirmed_flag, ) out["channel_name"] = ch["name"] out["source"] = ch["source"] results.append(out) elif ch["mode"] == "webhook": results.append({ "channel_name": ch["name"], "source": ch["source"], "ok": False, "msg": "Mode webhook non implémenté (OAuth requis)", }) else: results.append({ "channel_name": ch["name"], "source": ch["source"], "ok": False, "msg": f"Mode inconnu: {ch['mode']}", }) n_ok = sum(1 for r in results if r.get("ok")) return { "ok": n_ok > 0, "server": res["server"], "results": results, "summary": f"{n_ok}/{len(results)} canaux notifiés", } # ─────────────────────────────────────────────────────────────── # Compat : helper webhook (à conserver pour le bouton "Tester" dans Settings) # ─────────────────────────────────────────────────────────────── def send_test_message(webhook_url: str, channel_name: str, sender: str) -> Dict[str, Any]: """Envoie un message de test pour valider la config du webhook.""" payload = _adaptive_card( title=f"✅ Test PatchCenter — canal '{channel_name}'", body_lines=[ "Ce message confirme que le webhook Teams est correctement configuré.", {"title": "Envoyé par", "value": sender}, {"title": "Source", "value": "PatchCenter > Settings > Teams"}, ], color="good", ) return _post(webhook_url, payload) def send_intervention_start(webhook_url: str, hostname: str, application: str, intervenant: str, planned_at: str = None) -> Dict[str, Any]: """Annonce le DÉBUT d'une intervention de patching.""" body = [ {"title": "Serveur", "value": hostname}, {"title": "Application", "value": application or "—"}, {"title": "Intervenant", "value": intervenant or "—"}, ] if planned_at: body.append({"title": "Prévu", "value": planned_at}) payload = _adaptive_card( title=f"🟧 Début intervention patching — {hostname}", body_lines=body, color="warning", ) return _post(webhook_url, payload) def send_intervention_end(webhook_url: str, hostname: str, application: str, intervenant: str, status: str = "ok") -> Dict[str, Any]: """Annonce la FIN d'une intervention. status = 'ok' | 'ko'.""" color = "good" if status == "ok" else "attention" icon = "✅" if status == "ok" else "❌" payload = _adaptive_card( title=f"{icon} Fin intervention patching — {hostname}", body_lines=[ {"title": "Serveur", "value": hostname}, {"title": "Application", "value": application or "—"}, {"title": "Intervenant", "value": intervenant or "—"}, {"title": "Statut technique", "value": status.upper()}, "_(En attente de validation par le responsable applicatif)_", ], color=color, ) return _post(webhook_url, payload) def send_planning_reminder(webhook_url: str, hostname: str, application: str, jour: str, heure: str, intervenant: str) -> Dict[str, Any]: """Rappel de planning (envoyable du jeudi/vendredi précédent jusqu'au jour J).""" payload = _adaptive_card( title=f"🗓 Rappel planning patching — {hostname}", body_lines=[ {"title": "Serveur", "value": hostname}, {"title": "Application", "value": application or "—"}, {"title": "Date prévue", "value": jour or "—"}, {"title": "Heure prévue", "value": heure or "—"}, {"title": "Intervenant", "value": intervenant or "—"}, "_Merci de confirmer la disponibilité applicative._", ], color="default", ) return _post(webhook_url, payload)