"""Exécuteur de patching — exécute les commandes SSH et stream les résultats""" import threading import queue import time from datetime import datetime from sqlalchemy import text # File de messages par campagne (thread-safe) _streams = {} # campaign_id -> queue.Queue def get_stream(campaign_id): """Récupère ou crée la file de messages pour une campagne""" if campaign_id not in _streams: _streams[campaign_id] = queue.Queue(maxsize=1000) return _streams[campaign_id] def emit(campaign_id, msg, level="info"): """Émet un message dans le stream""" ts = datetime.now().strftime("%H:%M:%S") q = get_stream(campaign_id) try: q.put_nowait({"ts": ts, "msg": msg, "level": level}) except queue.Full: pass # Drop si full def clear_stream(campaign_id): """Vide le stream""" if campaign_id in _streams: while not _streams[campaign_id].empty(): try: _streams[campaign_id].get_nowait() except queue.Empty: break def execute_safe_patching(db_url, campaign_id, session_ids, branch="hprod"): """Exécute le safe patching en background (thread)""" from sqlalchemy import create_engine, text engine = create_engine(db_url) emit(campaign_id, f"=== Safe Patching — Branche {'Hors-prod' if branch == 'hprod' else 'Production'} ===", "header") emit(campaign_id, f"{len(session_ids)} serveur(s) à traiter", "info") emit(campaign_id, "") with engine.connect() as conn: for i, sid in enumerate(session_ids, 1): row = conn.execute(text(""" SELECT ps.id, s.hostname, s.fqdn, s.satellite_host, s.machine_type FROM patch_sessions ps JOIN servers s ON ps.server_id = s.id WHERE ps.id = :sid """), {"sid": sid}).fetchone() if not row: continue hn = row.hostname emit(campaign_id, f"[{i}/{len(session_ids)}] {hn}", "server") # Step 1: Check SSH emit(campaign_id, f" Connexion SSH...", "step") ssh_ok = _check_ssh(hn) if ssh_ok: emit(campaign_id, f" SSH : OK", "ok") else: emit(campaign_id, f" SSH : ÉCHEC — serveur ignoré", "error") conn.execute(text("UPDATE patch_sessions SET status = 'failed' WHERE id = :id"), {"id": sid}) conn.commit() continue # Step 2: Check disk emit(campaign_id, f" Espace disque...", "step") emit(campaign_id, f" Disque : OK (mode démo)", "ok") # Step 3: Check satellite emit(campaign_id, f" Satellite...", "step") emit(campaign_id, f" Satellite : OK (mode démo)", "ok") # Step 4: Snapshot if row.machine_type == 'vm': emit(campaign_id, f" Snapshot vSphere...", "step") emit(campaign_id, f" Snapshot : OK (mode démo)", "ok") # Step 5: Save state emit(campaign_id, f" Sauvegarde services/ports...", "step") emit(campaign_id, f" État sauvegardé", "ok") # Step 6: Dry run emit(campaign_id, f" Dry run yum check-update...", "step") time.sleep(0.3) # Simule emit(campaign_id, f" X packages disponibles (mode démo)", "info") # Step 7: Patching emit(campaign_id, f" Exécution safe patching...", "step") time.sleep(0.5) # Simule emit(campaign_id, f" Patching : OK (mode démo)", "ok") # Step 8: Post-check emit(campaign_id, f" Vérification post-patch...", "step") emit(campaign_id, f" needs-restarting : pas de reboot ✓", "ok") emit(campaign_id, f" Services : identiques ✓", "ok") # Update status conn.execute(text(""" UPDATE patch_sessions SET status = 'patched', date_realise = now() WHERE id = :id """), {"id": sid}) conn.commit() emit(campaign_id, f" → {hn} PATCHÉ ✓", "success") emit(campaign_id, "") # Fin emit(campaign_id, f"=== Terminé — {len(session_ids)} serveur(s) traité(s) ===", "header") emit(campaign_id, "__DONE__", "done") def _check_ssh(hostname): """Check SSH TCP (mode démo = toujours OK)""" import socket suffixes = ["", ".sanef.groupe", ".sanef-rec.fr"] for suffix in suffixes: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(3) r = sock.connect_ex((hostname + suffix, 22)) sock.close() if r == 0: return True except Exception: continue # Mode démo : retourner True même si pas joignable return True def start_execution(db_url, campaign_id, session_ids, branch="hprod"): """Lance l'exécution dans un thread séparé""" clear_stream(campaign_id) t = threading.Thread( target=execute_safe_patching, args=(db_url, campaign_id, session_ids, branch), daemon=True ) t.start() return t