- Migration v2: ajoute teams_channel_rules.match_is_database_server (NULL=any/TRUE=DB only/FALSE=non-DB only), servers.is_database_server boolean (default false), table server_additional_referents pour multi-referents - Service teams_service.py: resolve_channel_for_server -> resolve_channels_for_server (pluriel, retourne LIST) * msg_type=reboot: 1 seul canal (canal flagge is_reboot_channel) * server.teams_channel_id override: 1 seul canal * sinon FAN-OUT: TOUTES les regles actives qui matchent contribuent un canal * dynamic_to_email calcule selon nature de la regle (responsable / DBA pool / referents) - send_notification: boucle sur les canaux resultants, ecrit 1 fichier SP par destination - UI settings.html: nouveau filtre 'Match serveur DB' dans formulaire regle, badge dans tableau - Compat: resolve_channel_for_server (singulier) conserve comme wrapper qui retourne le 1er canal Permet le scenario: serveur DB avec responsable=Delcour -> notif a la fois dans conv Delcour ET dans conv DBA (Nadine+Cedric), via 2 regles qui matchent en parallele.
511 lines
22 KiB
Python
511 lines
22 KiB
Python
"""Service Microsoft Teams — deux modes d'envoi :
|
|
|
|
1. **mode='sharepoint'** (recommandé pour SANEF) — écriture d'un fichier .txt dans un
|
|
sous-dossier OneDrive synchronisé. Un Workflow Power Automate côté SharePoint
|
|
surveille le dossier et poste sur Teams. Aucun OAuth requis. Format identique au
|
|
.exe Sanef Patch Manager pour réutiliser les Workflows existants.
|
|
|
|
2. **mode='webhook'** (futur, OAuth requis sur tenant SANEF) — POST direct sur
|
|
l'URL Power Automate Direct API avec un Bearer token Entra ID.
|
|
|
|
Le routage serveur → canal/conversation est géré par la table `teams_channel_rules`
|
|
(rules-based) et résolu par `resolve_channel_for_server`.
|
|
|
|
Mode 'sharepoint' avec is_dynamic_dm=true : le fichier est préfixé par une ligne
|
|
`TO: <email>` que le Workflow PA lit pour poster dynamiquement à un destinataire,
|
|
sans avoir à créer un workflow par responsable.
|
|
"""
|
|
import json
|
|
import logging
|
|
import os
|
|
import threading
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional, List
|
|
|
|
import requests
|
|
|
|
log = logging.getLogger("patchcenter.teams")
|
|
|
|
HTTP_TIMEOUT = 10
|
|
PROXY_URL = None # à override via Settings si besoin
|
|
|
|
# ───────────────────────────────────────────────────────────────
|
|
# MODE SHAREPOINT — écriture fichier dans dossier OneDrive sync
|
|
# ───────────────────────────────────────────────────────────────
|
|
|
|
def format_message_text(server_name: str, msg_type: str, intervenant: str) -> str:
|
|
"""Formate le message texte plat (compatible workflow PA → Teams).
|
|
Format identique au .exe Sanef Patch Manager pour réutiliser les workflows
|
|
existants côté SharePoint."""
|
|
nom = (intervenant or "SecOps")
|
|
nom = nom[:1].upper() + nom[1:] if nom else "SecOps"
|
|
if msg_type == "debut":
|
|
return f"[SECOPS] : Intervenant({nom}) => Debut d'intervention sur {server_name}"
|
|
if msg_type == "reboot":
|
|
return f"[SECOPS] : Intervenant({nom}) => Reboot suite MAJ de {server_name}"
|
|
if msg_type == "fin":
|
|
return f"[SECOPS] : Intervenant({nom}) => Fin d'intervention sur {server_name}"
|
|
if msg_type == "annulation":
|
|
return f"[SECOPS] : Intervenant({nom}) => Annulation intervention sur {server_name}"
|
|
return f"[SECOPS] : Intervenant({nom}) => {msg_type} {server_name}"
|
|
|
|
|
|
def write_sharepoint_notification(sp_base: str, sp_route: str, msg_type: str,
|
|
server_name: str, intervenant: str,
|
|
dynamic_to_email: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Écrit un fichier .txt dans <sp_base>/<sp_route>/.
|
|
|
|
Format nom de fichier : <msg_type>_<server>_<YYYYMMDD_HHMMSS>.txt
|
|
Format contenu :
|
|
- Si dynamic_to_email : 1ère ligne `TO: <email>`, puis le message
|
|
- Sinon : juste le message texte plat
|
|
|
|
Le sous-dossier est créé s'il n'existe pas.
|
|
Retourne {ok, path, detail} ou {ok: False, msg}."""
|
|
if not sp_base or not sp_route:
|
|
return {"ok": False, "msg": "sharepoint_notif_path ou sp_route manquant"}
|
|
|
|
body = format_message_text(server_name, msg_type, intervenant)
|
|
if not body:
|
|
return {"ok": False, "msg": f"msg_type inconnu: {msg_type}"}
|
|
|
|
if dynamic_to_email:
|
|
content = f"TO: {dynamic_to_email}\n{body}\n"
|
|
else:
|
|
content = body + "\n"
|
|
|
|
try:
|
|
notif_dir = os.path.join(sp_base, sp_route)
|
|
os.makedirs(notif_dir, exist_ok=True)
|
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
# Nettoyage hostname pour nom fichier
|
|
safe_host = "".join(c if c.isalnum() or c in ("-", "_", ".") else "_"
|
|
for c in (server_name or "unknown"))[:80]
|
|
fname = f"{msg_type}_{safe_host}_{ts}.txt"
|
|
fpath = os.path.join(notif_dir, fname)
|
|
with open(fpath, "w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
log.info("Teams SP notif écrite: %s", fpath)
|
|
return {"ok": True, "path": fpath, "detail": "Fichier écrit, en attente sync OneDrive"}
|
|
except Exception as e:
|
|
log.error("Échec écriture SP notif: %s", e)
|
|
return {"ok": False, "msg": f"Erreur écriture: {e}"}
|
|
|
|
|
|
def write_sharepoint_notification_async(sp_base: str, sp_route: str, msg_type: str,
|
|
server_name: str, intervenant: str,
|
|
dynamic_to_email: Optional[str] = None) -> None:
|
|
"""Variante non-bloquante : lance write_sharepoint_notification dans un thread."""
|
|
threading.Thread(
|
|
target=write_sharepoint_notification,
|
|
args=(sp_base, sp_route, msg_type, server_name, intervenant, dynamic_to_email),
|
|
daemon=True,
|
|
).start()
|
|
|
|
|
|
# ───────────────────────────────────────────────────────────────
|
|
# MODE WEBHOOK — POST direct (futur, OAuth requis)
|
|
# ───────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _proxies():
|
|
if PROXY_URL:
|
|
return {"http": PROXY_URL, "https": PROXY_URL}
|
|
return None
|
|
|
|
|
|
def _post(webhook_url: str, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Poste un payload sur le webhook Teams. Retourne {ok, status, detail}."""
|
|
try:
|
|
r = requests.post(webhook_url, json=payload,
|
|
timeout=HTTP_TIMEOUT, proxies=_proxies())
|
|
ok = (200 <= r.status_code < 300)
|
|
return {
|
|
"ok": ok,
|
|
"status": r.status_code,
|
|
"detail": (r.text or "")[:500] if not ok else "Message envoyé",
|
|
}
|
|
except requests.exceptions.RequestException as e:
|
|
return {"ok": False, "status": 0, "detail": f"Réseau: {e}"}
|
|
|
|
|
|
def _adaptive_card(title: str, body_lines: list, color: str = "good") -> Dict[str, Any]:
|
|
"""Construit un payload Adaptive Card minimal (compatible Workflows).
|
|
color : 'good' (vert) | 'warning' (orange) | 'attention' (rouge) | 'default'."""
|
|
facts = []
|
|
body_blocks = [
|
|
{"type": "TextBlock", "text": title, "weight": "Bolder",
|
|
"size": "Medium", "color": color, "wrap": True}
|
|
]
|
|
for ln in body_lines:
|
|
if isinstance(ln, dict) and "title" in ln and "value" in ln:
|
|
facts.append({"title": ln["title"], "value": str(ln["value"])})
|
|
else:
|
|
body_blocks.append({"type": "TextBlock", "text": str(ln), "wrap": True,
|
|
"spacing": "Small"})
|
|
if facts:
|
|
body_blocks.append({"type": "FactSet", "facts": facts})
|
|
body_blocks.append({"type": "TextBlock",
|
|
"text": f"_{datetime.now().strftime('%Y-%m-%d %H:%M')} — PatchCenter_",
|
|
"size": "Small", "isSubtle": True, "spacing": "Medium"})
|
|
return {
|
|
"type": "message",
|
|
"attachments": [{
|
|
"contentType": "application/vnd.microsoft.card.adaptive",
|
|
"contentUrl": None,
|
|
"content": {
|
|
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
|
|
"type": "AdaptiveCard",
|
|
"version": "1.4",
|
|
"body": body_blocks,
|
|
}
|
|
}]
|
|
}
|
|
|
|
|
|
def resolve_channels_for_server(db, server_id: int, msg_type: str = "debut") -> Dict[str, Any]:
|
|
"""Résout la liste des canaux Teams cibles pour un serveur donné, selon le msg_type.
|
|
|
|
Algorithme :
|
|
1. Si msg_type='reboot' → uniquement le canal `is_reboot_channel=true`
|
|
(PAS de fan-out ; les reboots vont à un seul endroit, le canal général DSI).
|
|
2. servers.teams_channel_id (override explicite) → uniquement ce canal,
|
|
pas de fan-out non plus (override = "je sais ce que je veux").
|
|
3. Sinon : **fan-out** de toutes les règles actives qui matchent.
|
|
Toutes les règles dont les conditions matchent contribuent un canal
|
|
(multi-recipient : responsable + référents techniques + DBA + …).
|
|
4. Si aucune règle n'a matché → fallback canal `is_default=true`.
|
|
|
|
Conditions de match d'une règle :
|
|
- match_msg_type_in : msg_type doit être dans le tableau (NULL = tous)
|
|
- match_responsable_contact_id : doit être == servers.responsable_domaine_contact_id
|
|
- match_application_domain : substring insensible casse dans applications.domain
|
|
- match_env_in : env du serveur dans le tableau (insensible casse)
|
|
- match_hostname_pattern : substring insensible casse dans hostname
|
|
- match_is_database_server : NULL = pas filtré ; true = ne matche que les DB ;
|
|
false = ne matche que les non-DB
|
|
|
|
Pour chaque canal `is_dynamic_dm=true` retenu, on calcule un `dynamic_to_email`
|
|
distinct selon la nature de la règle (responsable/référent/DBA pool).
|
|
|
|
Retourne {
|
|
ok: bool,
|
|
msg (si ok=False),
|
|
server: {id, hostname, ...},
|
|
channels: list[ {channel_id, name, mode, sp_route, webhook_url,
|
|
is_dynamic_dm, dynamic_to_email, source} ],
|
|
}
|
|
"""
|
|
from sqlalchemy import text as sqlt
|
|
if not server_id:
|
|
return {"ok": False, "msg": "server_id manquant", "channels": []}
|
|
|
|
def _channel_row(channel_id):
|
|
return db.execute(sqlt("""
|
|
SELECT id, name, mode, sp_route, webhook_url, is_dynamic_dm
|
|
FROM teams_channels
|
|
WHERE id = :id AND is_active = true
|
|
"""), {"id": channel_id}).fetchone()
|
|
|
|
def _contact_upn(contact_id):
|
|
if not contact_id:
|
|
return None
|
|
c = db.execute(sqlt("SELECT teams_upn FROM contacts WHERE id = :id"),
|
|
{"id": contact_id}).fetchone()
|
|
return c.teams_upn if c and c.teams_upn else None
|
|
|
|
s = db.execute(sqlt("""
|
|
SELECT s.id, s.hostname, s.environnement, s.application_id,
|
|
s.responsable_domaine_contact_id, s.referent_technique_contact_id,
|
|
s.teams_channel_id AS server_channel_id,
|
|
COALESCE(s.is_database_server, false) AS is_db,
|
|
a.domain AS app_domain
|
|
FROM servers s
|
|
LEFT JOIN applications a ON a.id = s.application_id
|
|
WHERE s.id = :sid
|
|
"""), {"sid": server_id}).fetchone()
|
|
if not s:
|
|
return {"ok": False, "msg": f"Serveur id={server_id} introuvable", "channels": []}
|
|
|
|
server_info = {
|
|
"id": s.id, "hostname": s.hostname, "environnement": s.environnement,
|
|
"domain": s.app_domain, "is_database_server": s.is_db,
|
|
"responsable_domaine_contact_id": s.responsable_domaine_contact_id,
|
|
"referent_technique_contact_id": s.referent_technique_contact_id,
|
|
}
|
|
|
|
# Référents additionnels (multi-référents)
|
|
extra_referents = db.execute(sqlt("""
|
|
SELECT contact_id FROM server_additional_referents WHERE server_id = :sid
|
|
"""), {"sid": server_id}).fetchall()
|
|
extra_ref_ids = [r.contact_id for r in extra_referents]
|
|
|
|
def _build(ch, source, dynamic_to_email=None):
|
|
return {
|
|
"channel_id": ch.id, "name": ch.name,
|
|
"mode": ch.mode, "sp_route": ch.sp_route,
|
|
"webhook_url": ch.webhook_url,
|
|
"is_dynamic_dm": ch.is_dynamic_dm,
|
|
"dynamic_to_email": dynamic_to_email,
|
|
"source": source,
|
|
}
|
|
|
|
# 1) Reboot → canal reboot uniquement
|
|
if msg_type == "reboot":
|
|
ch = db.execute(sqlt("""
|
|
SELECT id, name, mode, sp_route, webhook_url, is_dynamic_dm
|
|
FROM teams_channels
|
|
WHERE is_reboot_channel = true AND is_active = true
|
|
ORDER BY id LIMIT 1
|
|
""")).fetchone()
|
|
if ch:
|
|
dyn = _contact_upn(s.responsable_domaine_contact_id) if ch.is_dynamic_dm else None
|
|
return {"ok": True, "server": server_info,
|
|
"channels": [_build(ch, "reboot", dyn)]}
|
|
return {"ok": False, "msg": "Aucun canal reboot configuré (is_reboot_channel)",
|
|
"server": server_info, "channels": []}
|
|
|
|
# 2) Override par serveur (gagne sur tout le reste)
|
|
if s.server_channel_id:
|
|
ch = _channel_row(s.server_channel_id)
|
|
if ch:
|
|
dyn = _contact_upn(s.responsable_domaine_contact_id) if ch.is_dynamic_dm else None
|
|
return {"ok": True, "server": server_info,
|
|
"channels": [_build(ch, "server_override", dyn)]}
|
|
|
|
# 3) Fan-out toutes les règles actives qui matchent
|
|
rules = db.execute(sqlt("""
|
|
SELECT id, name, priority, channel_id,
|
|
match_responsable_contact_id, match_application_domain,
|
|
match_env_in, match_msg_type_in, match_hostname_pattern,
|
|
match_is_database_server
|
|
FROM teams_channel_rules
|
|
WHERE is_active = true
|
|
ORDER BY priority ASC, id ASC
|
|
""")).fetchall()
|
|
|
|
server_env = (s.environnement or "").strip()
|
|
server_dom = (s.app_domain or "").strip()
|
|
server_host = (s.hostname or "").lower()
|
|
|
|
matched_channels = []
|
|
seen_channel_ids = set()
|
|
|
|
for r in rules:
|
|
if r.match_msg_type_in and msg_type not in r.match_msg_type_in:
|
|
continue
|
|
if r.match_responsable_contact_id and \
|
|
s.responsable_domaine_contact_id != r.match_responsable_contact_id:
|
|
continue
|
|
if r.match_application_domain:
|
|
md = r.match_application_domain.strip().lower()
|
|
if not server_dom or md not in server_dom.lower():
|
|
continue
|
|
if r.match_env_in:
|
|
envs_lower = [e.lower() for e in r.match_env_in]
|
|
if not server_env or server_env.lower() not in envs_lower:
|
|
continue
|
|
if r.match_hostname_pattern:
|
|
pat = r.match_hostname_pattern.strip().lower()
|
|
if pat and pat not in server_host:
|
|
continue
|
|
if r.match_is_database_server is not None:
|
|
if bool(r.match_is_database_server) != bool(s.is_db):
|
|
continue
|
|
# Toutes conditions matchent
|
|
ch = _channel_row(r.channel_id)
|
|
if not ch:
|
|
continue
|
|
if ch.id in seen_channel_ids:
|
|
continue # dédoublonnage si plusieurs règles pointent sur même canal
|
|
seen_channel_ids.add(ch.id)
|
|
|
|
# Détermine le dynamic_to_email selon nature de la règle
|
|
# Heuristique : si la règle matche un responsable_contact_id → DM ce contact
|
|
# sinon si match_is_database_server=true → pool DBA (gérée côté workflow PA)
|
|
# sinon → DM le responsable_domaine du serveur
|
|
dyn = None
|
|
if ch.is_dynamic_dm:
|
|
if r.match_responsable_contact_id:
|
|
dyn = _contact_upn(r.match_responsable_contact_id)
|
|
elif r.match_is_database_server:
|
|
# Pour DB : le workflow PA est censé connaître les destinataires
|
|
# (Nadine + Cedric). On peut joindre les UPN via les référents
|
|
# additionnels du serveur si renseignés, sinon vide → workflow décide.
|
|
upns = []
|
|
if s.referent_technique_contact_id:
|
|
u = _contact_upn(s.referent_technique_contact_id)
|
|
if u: upns.append(u)
|
|
for cid in extra_ref_ids:
|
|
u = _contact_upn(cid)
|
|
if u and u not in upns: upns.append(u)
|
|
dyn = ",".join(upns) if upns else None
|
|
else:
|
|
dyn = _contact_upn(s.responsable_domaine_contact_id)
|
|
|
|
matched_channels.append(_build(ch, f"rule:{r.name}", dyn))
|
|
|
|
if matched_channels:
|
|
return {"ok": True, "server": server_info, "channels": matched_channels}
|
|
|
|
# 4) Fallback default
|
|
ch = db.execute(sqlt("""
|
|
SELECT id, name, mode, sp_route, webhook_url, is_dynamic_dm
|
|
FROM teams_channels
|
|
WHERE is_default = true AND is_active = true
|
|
ORDER BY id LIMIT 1
|
|
""")).fetchone()
|
|
if ch:
|
|
dyn = _contact_upn(s.responsable_domaine_contact_id) if ch.is_dynamic_dm else None
|
|
return {"ok": True, "server": server_info,
|
|
"channels": [_build(ch, "default", dyn)]}
|
|
|
|
return {"ok": False, "msg": "Aucun canal résolu (aucune règle ne matche, pas de canal défaut)",
|
|
"server": server_info, "channels": []}
|
|
|
|
|
|
# Compat : ancien nom singulier — retourne le 1er canal résolu (best-effort).
|
|
# Conservé pour ne pas casser d'éventuels callers externes ; à terme à supprimer.
|
|
def resolve_channel_for_server(db, server_id: int, msg_type: str = "debut") -> Dict[str, Any]:
|
|
res = resolve_channels_for_server(db, server_id, msg_type)
|
|
if not res.get("ok") or not res.get("channels"):
|
|
return {"ok": False, "msg": res.get("msg", "Aucun canal résolu")}
|
|
ch = res["channels"][0]
|
|
return {
|
|
"ok": True,
|
|
"channel_id": ch["channel_id"], "name": ch["name"],
|
|
"mode": ch["mode"], "sp_route": ch["sp_route"],
|
|
"webhook_url": ch["webhook_url"], "is_dynamic_dm": ch["is_dynamic_dm"],
|
|
"dynamic_to_email": ch["dynamic_to_email"], "source": ch["source"],
|
|
}
|
|
|
|
|
|
def send_notification(db, server_id: int, msg_type: str, intervenant: str) -> Dict[str, Any]:
|
|
"""Orchestration haut niveau : résout les canaux et envoie 1 notif par canal.
|
|
|
|
Mode 'sharepoint' : écrit un fichier .txt par canal (fan-out multi-recipient).
|
|
Mode 'webhook' : non implémenté tant qu'OAuth pas en place.
|
|
|
|
Retourne {ok, server, results: list[ {channel_name, source, ok, detail/path} ]}.
|
|
"""
|
|
from .secrets_service import get_secret
|
|
res = resolve_channels_for_server(db, server_id, msg_type)
|
|
if not res.get("ok"):
|
|
return res
|
|
|
|
from sqlalchemy import text as sqlt
|
|
server_name = db.execute(sqlt("SELECT hostname FROM servers WHERE id = :id"),
|
|
{"id": server_id}).scalar() or "unknown"
|
|
|
|
sp_base = (get_secret(db, "sharepoint_notif_path") or "").strip()
|
|
|
|
results = []
|
|
for ch in res["channels"]:
|
|
if ch["mode"] == "sharepoint":
|
|
if not sp_base:
|
|
results.append({
|
|
"channel_name": ch["name"], "source": ch["source"],
|
|
"ok": False, "msg": "sharepoint_notif_path non configuré dans Settings",
|
|
})
|
|
continue
|
|
out = write_sharepoint_notification(
|
|
sp_base=sp_base, sp_route=ch["sp_route"], msg_type=msg_type,
|
|
server_name=server_name, intervenant=intervenant,
|
|
dynamic_to_email=ch.get("dynamic_to_email"),
|
|
)
|
|
out["channel_name"] = ch["name"]
|
|
out["source"] = ch["source"]
|
|
results.append(out)
|
|
elif ch["mode"] == "webhook":
|
|
results.append({
|
|
"channel_name": ch["name"], "source": ch["source"],
|
|
"ok": False, "msg": "Mode webhook non implémenté (OAuth requis)",
|
|
})
|
|
else:
|
|
results.append({
|
|
"channel_name": ch["name"], "source": ch["source"],
|
|
"ok": False, "msg": f"Mode inconnu: {ch['mode']}",
|
|
})
|
|
|
|
n_ok = sum(1 for r in results if r.get("ok"))
|
|
return {
|
|
"ok": n_ok > 0,
|
|
"server": res["server"],
|
|
"results": results,
|
|
"summary": f"{n_ok}/{len(results)} canaux notifiés",
|
|
}
|
|
|
|
|
|
# ───────────────────────────────────────────────────────────────
|
|
# Compat : helper webhook (à conserver pour le bouton "Tester" dans Settings)
|
|
# ───────────────────────────────────────────────────────────────
|
|
|
|
|
|
def send_test_message(webhook_url: str, channel_name: str, sender: str) -> Dict[str, Any]:
|
|
"""Envoie un message de test pour valider la config du webhook."""
|
|
payload = _adaptive_card(
|
|
title=f"✅ Test PatchCenter — canal '{channel_name}'",
|
|
body_lines=[
|
|
"Ce message confirme que le webhook Teams est correctement configuré.",
|
|
{"title": "Envoyé par", "value": sender},
|
|
{"title": "Source", "value": "PatchCenter > Settings > Teams"},
|
|
],
|
|
color="good",
|
|
)
|
|
return _post(webhook_url, payload)
|
|
|
|
|
|
def send_intervention_start(webhook_url: str, hostname: str, application: str,
|
|
intervenant: str, planned_at: str = None) -> Dict[str, Any]:
|
|
"""Annonce le DÉBUT d'une intervention de patching."""
|
|
body = [
|
|
{"title": "Serveur", "value": hostname},
|
|
{"title": "Application", "value": application or "—"},
|
|
{"title": "Intervenant", "value": intervenant or "—"},
|
|
]
|
|
if planned_at:
|
|
body.append({"title": "Prévu", "value": planned_at})
|
|
payload = _adaptive_card(
|
|
title=f"🟧 Début intervention patching — {hostname}",
|
|
body_lines=body,
|
|
color="warning",
|
|
)
|
|
return _post(webhook_url, payload)
|
|
|
|
|
|
def send_intervention_end(webhook_url: str, hostname: str, application: str,
|
|
intervenant: str, status: str = "ok") -> Dict[str, Any]:
|
|
"""Annonce la FIN d'une intervention. status = 'ok' | 'ko'."""
|
|
color = "good" if status == "ok" else "attention"
|
|
icon = "✅" if status == "ok" else "❌"
|
|
payload = _adaptive_card(
|
|
title=f"{icon} Fin intervention patching — {hostname}",
|
|
body_lines=[
|
|
{"title": "Serveur", "value": hostname},
|
|
{"title": "Application", "value": application or "—"},
|
|
{"title": "Intervenant", "value": intervenant or "—"},
|
|
{"title": "Statut technique", "value": status.upper()},
|
|
"_(En attente de validation par le responsable applicatif)_",
|
|
],
|
|
color=color,
|
|
)
|
|
return _post(webhook_url, payload)
|
|
|
|
|
|
def send_planning_reminder(webhook_url: str, hostname: str, application: str,
|
|
jour: str, heure: str, intervenant: str) -> Dict[str, Any]:
|
|
"""Rappel de planning (envoyable du jeudi/vendredi précédent jusqu'au jour J)."""
|
|
payload = _adaptive_card(
|
|
title=f"🗓 Rappel planning patching — {hostname}",
|
|
body_lines=[
|
|
{"title": "Serveur", "value": hostname},
|
|
{"title": "Application", "value": application or "—"},
|
|
{"title": "Date prévue", "value": jour or "—"},
|
|
{"title": "Heure prévue", "value": heure or "—"},
|
|
{"title": "Intervenant", "value": intervenant or "—"},
|
|
"_Merci de confirmer la disponibilité applicative._",
|
|
],
|
|
color="default",
|
|
)
|
|
return _post(webhook_url, payload)
|