#!/usr/bin/env python3 # ============================================================================== # patch_manager_v2.py — SANEF Linux Patch Manager GUI v2 # Gestion complète du patching Linux via Excel + SSH + vSphere # # Prérequis : py -m pip install paramiko openpyxl requests pyVmomi --proxy http://proxy.sanef.fr:8080 # Usage : py patch_manager_v2.py # Auteur : MYPCZEN / SANEF DSI - SOC # ============================================================================== import tkinter as tk from tkinter import ttk, filedialog, scrolledtext, messagebox, simpledialog import threading import paramiko import socket import time import csv import os import sys import json from datetime import datetime, date from copy import deepcopy import sqlite3 import hashlib import secrets import ctypes # openpyxl optionnel — installé si absent try: import openpyxl except ImportError: os.system("py -m pip install openpyxl --proxy http://proxy.sanef.fr:8080 -q") import openpyxl # pyVmomi optionnel — pour vSphere try: from pyVim.connect import SmartConnect, Disconnect from pyVmomi import vim VSPHERE_OK = True except ImportError: VSPHERE_OK = False # ============================================================================== # CONSTANTES # ============================================================================== VERSION = "2.0" VSPHERE_HOSTS = [ "vpgesavcs1.sanef.groupe", "vpmetavcs1.sanef.groupe", "vpsicavcs1.sanef.groupe", ] # Ordre vCenter selon le prefixe du hostname VCENTER_METIER = "vpmetavcs1.sanef.groupe" VCENTER_GESTION = "vpgesavcs1.sanef.groupe" VCENTER_SICA = "vpsicavcs1.sanef.groupe" PREFIXES_METIER = ("vp", "vi", "sp") PREFIXES_SICA = ("si",) def get_vcenter_order_for_server(server_name): """Retourne la liste des vCenters ordonnee selon le prefixe du hostname. vp*, vi*, sp* → metier first ; si* → sica first ; autres → gestion first""" prefix = server_name[:2].lower() if len(server_name) >= 2 else "" if prefix in PREFIXES_SICA: return [VCENTER_SICA, VCENTER_METIER, VCENTER_GESTION] elif prefix in PREFIXES_METIER: return [VCENTER_METIER, VCENTER_GESTION, VCENTER_SICA] else: return [VCENTER_GESTION, VCENTER_METIER, VCENTER_SICA] YUM_EXCLUDES_STD = ( "--exclude=*mongodb* --exclude=*mysql* --exclude=*postgres* " "--exclude=*mariadb* --exclude=*oracle* --exclude=*pgdg* --exclude=*php* " "--exclude=*java* --exclude=*redis* --exclude=*elasticsearch* --exclude=*nginx* " "--exclude=*mod_ssl* --exclude=*haproxy* --exclude=*certbot* " "--exclude=*python-certbot* --exclude=*docker* --exclude=*podman* " "--exclude=*centreon* --exclude=*qwserver* " "--exclude=*ansible* --exclude=*node* --exclude=*tina* --exclude=*memcached* " "--exclude=*nextcloud* --exclude=*pgbouncer* --exclude=*pgpool* " "--exclude=*pgbadger* --exclude=*psycopg2* --exclude=*barman* --exclude=*kibana* " "--exclude=*sdcss*" # Symantec DCS agent + sdcss-kmod kernel module ) YUM_EXCLUDES_FLUX_LIBRE = "--exclude=*podman*" PRE_PATCH_SCRIPT = r""" cat > /tmp/secops_pre_patching.sh << 'EOF' HOSTNAME=$(hostname) SNAPSHOT_DIR="/tmp" systemctl list-units --type=service --state=running --no-pager \ | awk '{print $1}' | grep '\.service$' \ > ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt echo "Services sauvegardes : $(wc -l < ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt)" ss -tlnup > ${SNAPSHOT_DIR}/secops_ports_avant_${HOSTNAME}.txt ss -tlnup | awk 'NR>1 && $7 != "" { match($7, /users:\(\("([^"]+)"/, arr) split($5, addr, ":") port = addr[length(addr)] if (arr[1] != "" && port+0 < 32768) print port, arr[1] }' | sort -u > ${SNAPSHOT_DIR}/secops_ports_detail_avant_${HOSTNAME}.txt echo "Ports sauvegardes : $(grep -c LISTEN ${SNAPSHOT_DIR}/secops_ports_avant_${HOSTNAME}.txt)" echo "PRE_PATCH_OK" EOF bash /tmp/secops_pre_patching.sh """ POST_PATCH_SCRIPT = r""" cat > /tmp/secops_post_patching.sh << 'EOF' HOSTNAME=$(hostname) SNAPSHOT_DIR="/tmp" GREEN='\033[0;32m'; RED='\033[0;31m'; YELLOW='\033[0;33m'; NC='\033[0m' RAPPORT="/tmp/rapport_patching_${HOSTNAME}_$(date +%Y%m%d_%H%M).txt" systemctl list-units --type=service --state=running --no-pager \ | awk '{print $1}' | grep '\.service$' \ > ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt DISPARUS_SVC=$(comm -23 \ <(sort ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt) \ <(sort ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt) \ | grep -v "user@") APPARUS_SVC=$(comm -13 \ <(sort ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt) \ <(sort ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt) \ | grep -v "setroubleshootd\|user@") echo "--- SERVICES ---" | tee -a ${RAPPORT} echo "Avant: $(wc -l < ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt) | Apres: $(wc -l < ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt)" | tee -a ${RAPPORT} [ -z "$DISPARUS_SVC" ] && echo "OK - Aucun service disparu" | tee -a ${RAPPORT} || { echo "ALERTE services disparus:" | tee -a ${RAPPORT}; echo "$DISPARUS_SVC" | tee -a ${RAPPORT}; } [ -z "$APPARUS_SVC" ] && echo "OK - Aucun nouveau service" | tee -a ${RAPPORT} || { echo "WARN nouveaux services:" | tee -a ${RAPPORT}; echo "$APPARUS_SVC" | tee -a ${RAPPORT}; } ss -tlnup > ${SNAPSHOT_DIR}/secops_ports_apres_${HOSTNAME}.txt ss -tlnup | awk 'NR>1 && $7 != "" { match($7, /users:\(\("([^"]+)"/, arr) split($5, addr, ":") port = addr[length(addr)] if (arr[1] != "" && port+0 < 32768) print port, arr[1] }' | sort -u > ${SNAPSHOT_DIR}/secops_ports_detail_apres_${HOSTNAME}.txt PORTS_DISPARUS=$(comm -23 \ <(sort ${SNAPSHOT_DIR}/secops_ports_detail_avant_${HOSTNAME}.txt) \ <(sort ${SNAPSHOT_DIR}/secops_ports_detail_apres_${HOSTNAME}.txt)) PORTS_APPARUS=$(comm -13 \ <(sort ${SNAPSHOT_DIR}/secops_ports_detail_avant_${HOSTNAME}.txt) \ <(sort ${SNAPSHOT_DIR}/secops_ports_detail_apres_${HOSTNAME}.txt)) echo "--- PORTS ---" | tee -a ${RAPPORT} echo "Avant: $(grep -c LISTEN ${SNAPSHOT_DIR}/secops_ports_avant_${HOSTNAME}.txt) | Apres: $(grep -c LISTEN ${SNAPSHOT_DIR}/secops_ports_apres_${HOSTNAME}.txt)" | tee -a ${RAPPORT} [ -z "$PORTS_DISPARUS" ] && echo "OK - Aucun port disparu" | tee -a ${RAPPORT} || { echo "ALERTE ports disparus:" | tee -a ${RAPPORT}; echo "$PORTS_DISPARUS" | tee -a ${RAPPORT}; } [ -z "$PORTS_APPARUS" ] && echo "OK - Aucun nouveau port" | tee -a ${RAPPORT} || { echo "WARN nouveaux ports:" | tee -a ${RAPPORT}; echo "$PORTS_APPARUS" | tee -a ${RAPPORT}; } echo "" | tee -a ${RAPPORT} echo "RAPPORT: ${RAPPORT}" echo "POST_PATCH_OK" EOF bash /tmp/secops_post_patching.sh """ FL_PRE_PATCH_SCRIPT = r""" cat > /tmp/secops_fl_pre_patch.sh << 'FLEOF' # Detecter l'utilisateur applicatif podman APP_USER=$(ps aux | grep -E "conmon|podman" | grep -v grep | awk '{print $1}' | sort -u | head -1) if [ -z "$APP_USER" ]; then echo "FL_NO_PODS" exit 0 fi echo "FL_USER=$APP_USER" # Snapshot pods running via sudo su sudo su - $APP_USER -c 'XDG_RUNTIME_DIR=/run/user/$(id -u) podman ps --format "{{.Names}}" --filter "status=running"' \ > /tmp/fl_pods_avant_$(hostname)_$(date +%Y%m%d_%H%M).txt echo "FL_PODS_SAVED=$(wc -l < /tmp/fl_pods_avant_$(hostname)_*.txt | tail -1)" # Status boo_manage si disponible if command -v boo_manage &>/dev/null || sudo su - $APP_USER -c "which boo_manage" &>/dev/null; then sudo su - $APP_USER -c "boo_manage -a -c status" 2>/dev/null || true fi echo "FL_PRE_OK" FLEOF bash /tmp/secops_fl_pre_patch.sh """ FL_POST_PATCH_SCRIPT = r""" cat > /tmp/secops_fl_post_patch.sh << 'FLEOF' APP_USER=$(ps aux | grep -E "conmon|podman" | grep -v grep | awk '{print $1}' | sort -u | head -1) if [ -z "$APP_USER" ]; then echo "FL_NO_PODS" exit 0 fi echo "FL_USER=$APP_USER" SNAPSHOT=$(ls -t /tmp/fl_pods_avant_$(hostname)_*.txt 2>/dev/null | head -1) if [ -z "$SNAPSHOT" ]; then echo "FL_NO_SNAPSHOT" sudo su - $APP_USER -c "boo_manage -a -c start" 2>/dev/null || true exit 0 fi AVANT=$(cat $SNAPSHOT) APRES=$(sudo su - $APP_USER -c 'XDG_RUNTIME_DIR=/run/user/$(id -u) podman ps --format "{{.Names}}"') ARRETES=0 while IFS= read -r pod; do if ! echo "$APRES" | grep -q "^${pod}$"; then echo "FL_RESTART=$pod" sudo su - $APP_USER -c "boo_manage -p ${pod} -c start" 2>/dev/null || \ sudo su - $APP_USER -c "XDG_RUNTIME_DIR=/run/user/\$(id -u) podman start ${pod}" 2>/dev/null ARRETES=$((ARRETES + 1)) fi done <<< "$AVANT" if [ $ARRETES -eq 0 ]; then echo "FL_ALL_OK" else echo "FL_RESTARTED=$ARRETES" fi sudo su - $APP_USER -c "boo_manage -a -c status" 2>/dev/null || true echo "FL_POST_OK" FLEOF bash /tmp/secops_fl_post_patch.sh """ PRESET_PACKAGES = { "Patch global (avec excludes)": "", "Java": "java-1.8.0-openjdk java-1.8.0-openjdk-headless", "OpenSSL": "openssl", "OpenSSH": "openssh openssh-server openssh-clients", "gnutls + libsoup + rhc": "gnutls libsoup rhc", "glibc": "glibc", "mariadb-libs": "mariadb-libs", "curl": "curl libcurl", "Personnalise...": "CUSTOM", } SETTINGS_FILE = os.path.join(os.path.expanduser("~"), ".patch_manager_settings.json") DB_PATH = os.path.join(os.path.expanduser("~"), ".patch_manager.db") # ============================================================================== # DATABASE — Auth + Audit # ============================================================================== _db_lock = threading.Lock() def _hash_password(password, salt=None): if salt is None: salt = secrets.token_hex(16) h = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 310_000) return f"{salt}${h.hex()}" def _verify_password(password, stored): if "$" not in stored: return False salt, _ = stored.split("$", 1) return _hash_password(password, salt) == stored class Database: def __init__(self): self.conn = sqlite3.connect(DB_PATH, check_same_thread=False) self.conn.row_factory = sqlite3.Row self._init_tables() def _exec(self, sql, params=()): with _db_lock: return self.conn.execute(sql, params) def _commit(self): with _db_lock: self.conn.commit() def _init_tables(self): with _db_lock: self.conn.executescript(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'operator', must_change_pwd INTEGER NOT NULL DEFAULT 1, failed_attempts INTEGER NOT NULL DEFAULT 0, locked INTEGER NOT NULL DEFAULT 0, created_at TEXT DEFAULT (datetime('now','localtime')), last_login TEXT ); CREATE TABLE IF NOT EXISTS audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT DEFAULT (datetime('now','localtime')), username TEXT, action TEXT NOT NULL, details TEXT ); """) # Compte admin par defaut — mot de passe aléatoire unique row = self.conn.execute("SELECT id FROM users WHERE username='admin'").fetchone() if not row: init_pwd = secrets.token_urlsafe(16) h = _hash_password(init_pwd) self.conn.execute( "INSERT INTO users (username, password_hash, role, must_change_pwd) VALUES (?,?,?,?)", ("admin", h, "admin", 1)) self._init_password = init_pwd # Affiché une seule fois au login self.conn.commit() def authenticate(self, username, password): row = self._exec("SELECT * FROM users WHERE LOWER(username)=LOWER(?)", (username,)).fetchone() if not row: return None, "Utilisateur inconnu" if row["locked"]: return None, "Compte verrouille (3 tentatives). Contactez l'admin." if not _verify_password(password, row["password_hash"]): attempts = row["failed_attempts"] + 1 if attempts >= 3: self._exec("UPDATE users SET failed_attempts=?, locked=1 WHERE id=?", (attempts, row["id"])) else: self._exec("UPDATE users SET failed_attempts=? WHERE id=?", (attempts, row["id"])) self._commit() remaining = 3 - attempts if remaining <= 0: return None, "Compte verrouille apres 3 echecs." return None, f"Mot de passe incorrect ({remaining} essai(s) restant(s))" # Succes self._exec("UPDATE users SET failed_attempts=0, last_login=datetime('now','localtime') WHERE id=?", (row["id"],)) self._commit() return dict(row), None def change_password(self, username, new_password): h = _hash_password(new_password) self._exec("UPDATE users SET password_hash=?, must_change_pwd=0 WHERE username=?", (h, username)) self._commit() def create_user(self, username, password, role="operator"): try: h = _hash_password(password) self._exec("INSERT INTO users (username, password_hash, role, must_change_pwd) VALUES (?,?,?,1)", (username, h, role)) self._commit() return True except sqlite3.IntegrityError: return False def delete_user(self, username): self._exec("DELETE FROM users WHERE username=? AND username != 'admin'", (username,)) self._commit() def unlock_user(self, username): self._exec("UPDATE users SET locked=0, failed_attempts=0 WHERE username=?", (username,)) self._commit() def reset_password(self, username, new_password): h = _hash_password(new_password) self._exec("UPDATE users SET password_hash=?, must_change_pwd=1, locked=0, failed_attempts=0 WHERE username=?", (h, username)) self._commit() def list_users(self): return [dict(r) for r in self._exec("SELECT * FROM users ORDER BY username").fetchall()] def log_action(self, username, action, details=""): self._exec("INSERT INTO audit_log (username, action, details) VALUES (?,?,?)", (username, action, details)) self._commit() def get_logs(self, limit=500): return [dict(r) for r in self._exec( "SELECT * FROM audit_log ORDER BY id DESC LIMIT ?", (limit,)).fetchall()] # ============================================================================== # SPLUNK HEC — envoi de logs vers Splunk # ============================================================================== def send_to_splunk(settings, event_data): """Envoie un evenement vers Splunk via HEC (HTTP Event Collector). settings doit contenir: splunk_url, splunk_token, splunk_index, splunk_enabled event_data: dict avec les champs a envoyer""" if not settings.get("splunk_enabled") or settings.get("splunk_enabled") == "false": return url = settings.get("splunk_url", "").strip() token = settings.get("_splunk_token_session", settings.get("splunk_token", "")).strip() if not url or not token: return try: import urllib.request import ssl payload = json.dumps({ "index": settings.get("splunk_index", "main"), "sourcetype": "patch_manager", "source": "sanef_patch_manager_v2", "host": socket.gethostname(), "event": event_data, }).encode("utf-8") req = urllib.request.Request( url.rstrip("/") + "/services/collector/event", data=payload, headers={ "Authorization": f"Splunk {token}", "Content-Type": "application/json", }, method="POST") # Proxy SANEF si configure proxy_url = settings.get("proxy_url", "") if proxy_url: handler = urllib.request.ProxyHandler({"https": proxy_url, "http": proxy_url}) opener = urllib.request.build_opener(handler) else: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_peer = False handler = urllib.request.HTTPSHandler(context=ctx) opener = urllib.request.build_opener(handler) opener.open(req, timeout=5) except Exception: pass # Silencieux — ne jamais bloquer le patching pour un log # ============================================================================== # WINDOW CENTERING — toujours sur le meme ecran # ============================================================================== def center_window(win, width=None, height=None, parent=None): """Centre une fenetre sur le meme ecran que le parent (ou le curseur si pas de parent)""" win.update_idletasks() if width is None: width = win.winfo_width() if height is None: height = win.winfo_height() try: user32 = ctypes.windll.user32 class POINT(ctypes.Structure): _fields_ = [("x", ctypes.c_long), ("y", ctypes.c_long)] class RECT(ctypes.Structure): _fields_ = [("left", ctypes.c_long), ("top", ctypes.c_long), ("right", ctypes.c_long), ("bottom", ctypes.c_long)] class MONITORINFO(ctypes.Structure): _fields_ = [("cbSize", ctypes.c_ulong), ("rcMonitor", RECT), ("rcWork", RECT), ("dwFlags", ctypes.c_ulong)] MONITOR_DEFAULTTONEAREST = 2 if parent is not None: # Centrer sur l'ecran du parent px = parent.winfo_x() + parent.winfo_width() // 2 py = parent.winfo_y() + parent.winfo_height() // 2 pt = POINT(px, py) else: # Centrer sur l'ecran du curseur pt = POINT() user32.GetCursorPos(ctypes.byref(pt)) hmon = user32.MonitorFromPoint(pt, MONITOR_DEFAULTTONEAREST) mi = MONITORINFO() mi.cbSize = ctypes.sizeof(MONITORINFO) user32.GetMonitorInfoW(hmon, ctypes.byref(mi)) work = mi.rcWork sx = work.left + (work.right - work.left - width) // 2 sy = work.top + (work.bottom - work.top - height) // 2 except Exception: if parent is not None: sx = parent.winfo_x() + (parent.winfo_width() - width) // 2 sy = parent.winfo_y() + (parent.winfo_height() - height) // 2 else: sx = (win.winfo_screenwidth() - width) // 2 sy = (win.winfo_screenheight() - height) // 2 win.geometry(f"{width}x{height}+{sx}+{sy}") # ============================================================================== # SETTINGS # ============================================================================== DEFAULT_SETTINGS = { "keyfile": r"C:\scripts\id_rsa_cybglobal.pem", "keyfile2": r"C:\scripts\id_rsa_cybsecope.ppk", "cybr_user": "CYBP01336", "target_user": "cybsecope", "psmp": "psmp.sanef.fr", "ssh_port": 22, "timeout": 20, "parallelism": 3, "excel_file": "", "patcher": "", "vs_user": "", "splunk_enabled": "false", "splunk_url": "", "splunk_index": "main", "proxy_url": "http://proxy.sanef.fr:8080", "sharepoint_notif_path": "", "network_log_path": "", } def load_settings(): if os.path.exists(SETTINGS_FILE): try: with open(SETTINGS_FILE, "r") as f: s = json.load(f) DEFAULT_SETTINGS.update(s) except Exception: pass return dict(DEFAULT_SETTINGS) def save_settings(settings): try: with open(SETTINGS_FILE, "w") as f: json.dump(settings, f, indent=2) except Exception: pass # ============================================================================== # EXCEL READER # ============================================================================== def get_week_sheet(wb): """Trouve la feuille dont le nom correspond à la semaine courante""" week_num = date.today().isocalendar()[1] candidates = [f"S{week_num}", f"S{week_num:02d}", f"Semaine {week_num}", f"W{week_num}", str(week_num)] for name in candidates: if name in wb.sheetnames: return name # Retourner toutes les feuilles pour que l'utilisateur choisisse return None # Couleur vert Excel = patché OK EXCEL_GREEN = "FF00B050" def read_excel_servers(filepath, sheet_name=None): """Lit le fichier Excel et retourne la liste des serveurs Linux + couleurs. Utilise une copie temporaire pour eviter les conflits OneDrive.""" import shutil, tempfile, io tmp_dir = tempfile.mkdtemp() tmp_path = os.path.join(tmp_dir, os.path.basename(filepath)) wb = None # Methode 1: copie fichier try: shutil.copy2(filepath, tmp_path) wb = openpyxl.load_workbook(tmp_path, data_only=True) except Exception: pass # Methode 2: lecture binaire en memoire (bypass lock OneDrive) if wb is None: try: with open(filepath, "rb") as f: data = io.BytesIO(f.read()) wb = openpyxl.load_workbook(data, data_only=True) except Exception: pass # Methode 3: ouverture directe if wb is None: wb = openpyxl.load_workbook(filepath, data_only=True) if sheet_name is None: sheet_name = get_week_sheet(wb) if sheet_name is None: return None, wb.sheetnames # retourner la liste pour que l'user choisisse ws = wb[sheet_name] # Lire les en-têtes (première ligne non vide) headers = {} header_row = None for row_idx, row in enumerate(ws.iter_rows(max_row=5), start=1): vals = [c.value for c in row if c.value is not None] if len(vals) >= 3: headers = {str(c.value).strip().lower(): c.column - 1 for c in row if c.value is not None} header_row = row_idx break if not headers: return [], wb.sheetnames # Mapping champs (tolérance casse/accents) def find_col(keys): for k in keys: for h, idx in headers.items(): if k.lower() in h.lower(): return idx return None col_server = find_col(["asset name", "nom du serveur", "serveur", "hostname", "server", "nom"]) col_os = find_col(["os", "système", "systeme"]) col_env = find_col(["environnement", "environment", "env"]) col_domain = find_col(["domaine", "domain"]) col_app = find_col(["nom complet", "application", "nom_complet"]) col_accord = find_col(["accord"]) col_date = find_col(["date du patch", "date patch", "date heure", "date_heure", "date patching", "date"]) col_patcher = find_col(["intervenant", "patcheur", "technicien"]) servers = [] for row in ws.iter_rows(min_row=header_row + 1): vals = [c.value for c in row] cells = list(row) if not vals or all(v is None for v in vals): continue def get(col): if col is None or col >= len(vals): return "" v = vals[col] return str(v).strip() if v is not None else "" os_val = get(col_os) if "linux" not in os_val.lower(): continue server_name = get(col_server) if not server_name: continue # Parsing date date_val = None if col_date is not None and col_date < len(vals): raw_date = vals[col_date] if isinstance(raw_date, datetime): date_val = raw_date.date() elif isinstance(raw_date, date): date_val = raw_date elif raw_date: try: date_val = datetime.strptime(str(raw_date).strip()[:10], "%Y-%m-%d").date() except Exception: try: date_val = datetime.strptime(str(raw_date).strip()[:10], "%d/%m/%Y").date() except Exception: date_val = None # Accord — règle stricte : # colonne absente OU cellule vide = NON (on ne patche pas sans accord explicite) # Seul "oui" / True dans la cellule = accord confirmé if col_accord is None or col_accord >= len(vals): raw_accord = "non" else: raw_v = vals[col_accord] if raw_v is None or str(raw_v).strip() == "": raw_accord = "non" # vide = pas d'accord elif isinstance(raw_v, bool): raw_accord = "oui" if raw_v else "non" else: v_str = str(raw_v).strip().lower() raw_accord = "oui" if v_str == "oui" else "non" # Détecter si la cellule serveur a un fond vert = déjà patché already_patched = False if col_server is not None and col_server < len(cells): try: fill = cells[col_server].fill bg = fill.fgColor.rgb if fill and fill.fgColor else "" if bg == EXCEL_GREEN or bg.upper() == EXCEL_GREEN: already_patched = True except Exception: pass servers.append({ "server": server_name, "os": os_val, "env": get(col_env), "domain": get(col_domain), "app": get(col_app), "accord": raw_accord if raw_accord else "non", "date_patch": date_val, "patcher": (get(col_patcher) or "").strip(), "selected": False, "snap_done": False, "is_physical": False, "status": "✅ DÉJÀ PATCHÉ" if already_patched else "—", "patch_status": "PATCHED" if already_patched else "—", "patch_detail": "Fond vert Excel" if already_patched else "", "reboot_required": False, "already_patched": already_patched, "excel_row": cells[0].row if cells else None, }) wb.close() # Nettoyage copie temporaire try: os.remove(tmp_path) os.rmdir(tmp_dir) except Exception: pass return servers, None # ============================================================================== # SSH HELPERS # ============================================================================== def build_fqdn(server, env): if "." in server: return [server] env_low = env.lower() if "recette" in env_low or "rec" in env_low: return [server, f"{server}.sanef-rec.fr"] else: return [server, f"{server}.sanef.groupe"] def load_key(keyfile): if not keyfile or not os.path.exists(keyfile.strip('"').strip("'")): return None keyfile = keyfile.strip('"').strip("'") for cls in [paramiko.RSAKey, paramiko.Ed25519Key, paramiko.ECDSAKey]: try: return cls.from_private_key_file(keyfile) except Exception: continue return None def ssh_connect(server, env, settings, pkey, pkey2, cyb_password=None): """Connexion SSH avec fallback FQDN et clé. Retourne (client, hostname_effectif)""" candidates = build_fqdn(server, env) is_prod = "prod" in env.lower() or "production" in env.lower() ssh_port = int(settings.get("ssh_port", 22)) for hostname in candidates: if is_prod and cyb_password: # CyberArk keyboard-interactive (PSMP toujours port 22) username = f"{settings['cybr_user']}@{settings['target_user']}@{hostname}" password = cyb_password try: def handler(title, instructions, prompt_list): return [password] * len(prompt_list) transport = paramiko.Transport((settings["psmp"], 22)) transport.connect() transport.auth_interactive(username, handler) client = paramiko.SSHClient() client._transport = transport client._eff_host = hostname return client, hostname except Exception: continue else: # SSH direct avec clé for key in [k for k in [pkey, pkey2] if k]: try: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(hostname=hostname, port=ssh_port, username=settings["target_user"], pkey=key, timeout=settings["timeout"], look_for_keys=False, allow_agent=False) client._eff_host = hostname return client, hostname except Exception: continue return None, None def run_cmd(client, cmd, timeout=300): try: _, stdout, stderr = client.exec_command(cmd, timeout=timeout) # Timeout sur le read pour eviter les blocages PSMP stdout.channel.settimeout(timeout) stderr.channel.settimeout(timeout) out = stdout.read().decode("utf-8", errors="ignore").strip() err = stderr.read().decode("utf-8", errors="ignore").strip() return out, err except socket.timeout: return "", "TIMEOUT" except Exception as e: return "", str(e) def run_cmd_stream(client, cmd, timeout=300, on_line=None): """Execute une commande SSH et streame la sortie ligne par ligne. on_line(line) est appele pour chaque ligne recue en temps reel. Retourne (full_output, stderr) comme run_cmd.""" try: _, stdout, stderr = client.exec_command(cmd, timeout=timeout) stdout.channel.settimeout(timeout) stderr.channel.settimeout(timeout) lines = [] buf = "" while not stdout.channel.exit_status_ready() or stdout.channel.recv_ready(): if stdout.channel.recv_ready(): chunk = stdout.channel.recv(4096).decode("utf-8", errors="ignore") buf += chunk while "\n" in buf: line, buf = buf.split("\n", 1) line = line.rstrip("\r") lines.append(line) if on_line and line.strip(): on_line(line) else: time.sleep(0.1) # Lire le reste remaining = stdout.read().decode("utf-8", errors="ignore") buf += remaining for line in buf.splitlines(): line = line.rstrip("\r") lines.append(line) if on_line and line.strip(): on_line(line) err = stderr.read().decode("utf-8", errors="ignore").strip() return "\n".join(lines).strip(), err except socket.timeout: return "\n".join(lines).strip() if 'lines' in dir() else "", "TIMEOUT" except Exception as e: return "\n".join(lines).strip() if 'lines' in dir() else "", str(e) def detect_physical(client): """Détecte si le serveur est physique ou VM""" out, _ = run_cmd(client, "sudo virt-what 2>/dev/null || echo UNKNOWN", 10) if not out or out == "UNKNOWN": out2, _ = run_cmd(client, "sudo dmidecode -s system-manufacturer 2>/dev/null || echo UNKNOWN", 10) vmware_kw = ["vmware", "virtualbox", "kvm", "xen", "qemu", "microsoft corporation"] if any(k in out2.lower() for k in vmware_kw): return False # VM phys_kw = ["hp", "dell", "lenovo", "ibm", "supermicro", "cisco"] if any(k in out2.lower() for k in phys_kw): return True # Physique if out and out != "UNKNOWN": return False # virt-what a trouvé quelque chose = VM return False # par défaut supposer VM (plus sûr) def _is_fl_podman(server_name): """Retourne True si le serveur Flux Libre utilise Podman (pas les BST/HAProxy)""" name_low = server_name.lower() # BST = HAProxy load balancers → pas Podman, excludes standards if "bst" in name_low or "hbst" in name_low or "bbst" in name_low or "abst" in name_low: return False return True def build_yum_excludes(domain, exclude_kernel, server_name=""): """Construit la chaine d'excludes yum selon le domaine""" domain_low = domain.lower() is_flux_libre = "flux libre" in domain_low if is_flux_libre and _is_fl_podman(server_name): excludes = YUM_EXCLUDES_FLUX_LIBRE else: excludes = YUM_EXCLUDES_STD if exclude_kernel: excludes += " --exclude=*kernel*" return excludes def build_yum_command(domain, packages, exclude_kernel, dryrun, server_name="", extra_excludes=""): """Construit la commande yum selon le domaine""" excludes = build_yum_excludes(domain, exclude_kernel, server_name) if extra_excludes: excludes += " " + extra_excludes if dryrun: if packages: cmd = f"sudo yum check-update {packages} 2>/dev/null | grep -vE '^$|^Load|Updat|kB|bps|00:|^Red|^EPEL|^Sub' | grep -E '^[a-z]' | head -20" else: cmd = f"sudo yum check-update {excludes} 2>/dev/null | grep -vE '^$|^Load|Updat|kB|bps|00:|^Red|^EPEL|^Sub' | grep -E '^[a-z]' | head -30" else: if packages: cmd = f"sudo yum update -y {packages} 2>&1" else: cmd = f"sudo yum update -y {excludes} 2>&1" return cmd # ============================================================================== # LOGIN DIALOG # ============================================================================== class LoginDialog(tk.Toplevel): def __init__(self, parent, db): super().__init__(parent) self.db = db self.result = None # {"username":..., "role":...} ou None self.title("SANEF Patch Manager - Connexion") self.configure(bg="#1e1e2e") self.resizable(False, False) self.protocol("WM_DELETE_WINDOW", self._quit) self._build() center_window(self, 420, 340) self.grab_set() self.bind("", lambda e: self._login()) self.wait_window() def _build(self): BG = "#1e1e2e"; BG2 = "#2a2a3e"; FG = "#cdd6f4"; ACCENT = "#89b4fa" tk.Label(self, text="SANEF PATCH MANAGER", bg="#181825", fg=ACCENT, font=("Consolas", 14, "bold"), pady=10).pack(fill="x") tk.Label(self, text="Authentification requise", bg=BG, fg="#6c7086", font=("Consolas", 10)).pack(pady=(10, 15)) f = tk.Frame(self, bg=BG) f.pack(pady=5) tk.Label(f, text="Utilisateur :", bg=BG, fg=FG, font=("Consolas", 11), width=14, anchor="w").grid(row=0, column=0, padx=8, pady=6) self.user_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 11), insertbackground=FG, width=22) self.user_entry.grid(row=0, column=1, padx=8, pady=6) self.user_entry.focus_set() tk.Label(f, text="Mot de passe :", bg=BG, fg=FG, font=("Consolas", 11), width=14, anchor="w").grid(row=1, column=0, padx=8, pady=6) self.pwd_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 11), insertbackground=FG, width=22, show="*") self.pwd_entry.grid(row=1, column=1, padx=8, pady=6) self.msg_label = tk.Label(self, text="", bg=BG, fg="#f38ba8", font=("Consolas", 10)) self.msg_label.pack(pady=8) bf = tk.Frame(self, bg=BG) bf.pack(pady=10) tk.Button(bf, text="Connexion", bg="#40a02b", fg="white", font=("Consolas", 11, "bold"), padx=20, pady=4, command=self._login).pack(side="left", padx=8) tk.Button(bf, text="Quitter", bg="#313244", fg="#cdd6f4", font=("Consolas", 10), padx=12, pady=4, command=self._quit).pack(side="left", padx=8) tk.Label(self, text=f"SQATM Patch Manager v{VERSION}", bg=BG, fg="#45475a", font=("Consolas", 9)).pack(side="bottom", pady=5) def _login(self): username = self.user_entry.get().strip() password = self.pwd_entry.get() if not username or not password: self.msg_label.configure(text="Saisir utilisateur et mot de passe") return user, error = self.db.authenticate(username, password) if error: self.msg_label.configure(text=error) self.pwd_entry.delete(0, "end") return self.db.log_action(username, "LOGIN", f"role={user['role']}") self.result = user self.destroy() def _quit(self): self.result = None self.destroy() # ============================================================================== # CHANGE PASSWORD DIALOG # ============================================================================== class ChangePasswordDialog(tk.Toplevel): def __init__(self, parent, db, username, forced=False): super().__init__(parent) self._parent = parent self.db = db self.username = username self.forced = forced self.success = False self.title("Changement de mot de passe") self.configure(bg="#1e1e2e") self.resizable(False, False) if forced: self.protocol("WM_DELETE_WINDOW", lambda: None) self._build() center_window(self, 420, 300, parent=parent) self.grab_set() self.bind("", lambda e: self._change()) self.wait_window() def _build(self): BG = "#1e1e2e"; BG2 = "#2a2a3e"; FG = "#cdd6f4"; ACCENT = "#89b4fa" msg = "Premier login : vous devez changer votre mot de passe" if self.forced else "Changement de mot de passe" tk.Label(self, text=msg, bg="#181825", fg=ACCENT, font=("Consolas", 11, "bold"), pady=10).pack(fill="x") f = tk.Frame(self, bg=BG) f.pack(pady=15) tk.Label(f, text="Nouveau mdp :", bg=BG, fg=FG, font=("Consolas", 10), width=16, anchor="w").grid(row=0, column=0, padx=8, pady=6) self.new_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 10), insertbackground=FG, width=22, show="*") self.new_entry.grid(row=0, column=1, padx=8, pady=6) self.new_entry.focus_set() tk.Label(f, text="Confirmer :", bg=BG, fg=FG, font=("Consolas", 10), width=16, anchor="w").grid(row=1, column=0, padx=8, pady=6) self.confirm_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 10), insertbackground=FG, width=22, show="*") self.confirm_entry.grid(row=1, column=1, padx=8, pady=6) self.msg_label = tk.Label(self, text="", bg=BG, fg="#f38ba8", font=("Consolas", 10)) self.msg_label.pack(pady=5) tk.Button(self, text="Valider", bg="#40a02b", fg="white", font=("Consolas", 11, "bold"), padx=20, pady=4, command=self._change).pack(pady=10) def _change(self): p1 = self.new_entry.get() p2 = self.confirm_entry.get() if len(p1) < 4: self.msg_label.configure(text="Minimum 4 caracteres") return if p1 != p2: self.msg_label.configure(text="Les mots de passe ne correspondent pas") return self.db.change_password(self.username, p1) self.db.log_action(self.username, "CHANGE_PASSWORD", "") self.success = True self.destroy() # ============================================================================== # DIALOGUE SETTINGS # ============================================================================== class SettingsDialog(tk.Toplevel): def __init__(self, parent, settings): super().__init__(parent) self._parent = parent self.title("Parametres") self.configure(bg="#1e1e2e") self.resizable(True, True) self.settings = dict(settings) self.result = None self._build() # Taille adaptee a l'ecran sh = parent.winfo_screenheight() h = min(780, int(sh * 0.85)) center_window(self, 600, h, parent=parent) self.grab_set() self.wait_window() def _build(self): BG = "#1e1e2e"; BG2 = "#2a2a3e"; FG = "#cdd6f4"; ACCENT = "#89b4fa" # Titre fixe en haut tk.Label(self, text="PARAMETRES", bg="#313244", fg=ACCENT, font=("Consolas",13,"bold"), pady=8).pack(fill="x") # Zone scrollable canvas = tk.Canvas(self, bg=BG, highlightthickness=0) scrollbar = ttk.Scrollbar(self, orient="vertical", command=canvas.yview) self.scroll_frame = tk.Frame(canvas, bg=BG) self.scroll_frame.bind("", lambda e: canvas.configure(scrollregion=canvas.bbox("all"))) canvas.create_window((0, 0), window=self.scroll_frame, anchor="nw") canvas.configure(yscrollcommand=scrollbar.set) canvas.pack(side="left", fill="both", expand=True) scrollbar.pack(side="right", fill="y") # Scroll molette canvas.bind_all("", lambda e: canvas.yview_scroll(int(-1*(e.delta/120)), "units")) sf = self.scroll_frame def row(label, key, default=""): f = tk.Frame(sf, bg=BG) f.pack(fill="x", padx=20, pady=4) tk.Label(f, text=label, bg=BG, fg=FG, font=("Consolas",10), width=22, anchor="w").pack(side="left") var = tk.StringVar(value=self.settings.get(key, default)) setattr(self, f"var_{key}", var) tk.Entry(f, textvariable=var, bg=BG2, fg=FG, font=("Consolas",10), insertbackground=FG, width=32).pack(side="left", padx=4) return var tk.Label(sf, text="SSH — Recette", bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2)) row("Cle SSH principale :", "keyfile") row("Cle SSH secondaire :", "keyfile2") tk.Label(sf, text="CyberArk — Production", bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2)) row("Compte CyberArk :", "cybr_user", "CYBP01336") row("Utilisateur cible :", "target_user", "cybsecope") row("PSMP hostname :", "psmp", "psmp.sanef.fr") tk.Label(sf, text="Identite patcheur", bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2)) row("Nom du patcheur :", "patcher", "") tk.Label(sf, text="vSphere — Snapshots", bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2)) row("Utilisateur vCenter :", "vs_user", "") f_pwd = tk.Frame(sf, bg=BG) f_pwd.pack(fill="x", padx=20, pady=4) tk.Label(f_pwd, text="Mot de passe vCenter :", bg=BG, fg=FG, font=("Consolas",10), width=22, anchor="w").pack(side="left") self.var_vs_pwd = tk.StringVar(value=self.settings.get("_vs_pwd_session", "")) # Session only tk.Entry(f_pwd, textvariable=self.var_vs_pwd, show="*", bg=BG2, fg=FG, font=("Consolas",10), insertbackground=FG, width=32).pack(side="left", padx=4) tk.Label(f_pwd, text="(non stocke)", bg=BG, fg="#6c7086", font=("Consolas",8)).pack(side="left", padx=4) tk.Label(sf, text="Connexion SSH", bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2)) row("Port SSH :", "ssh_port", "22") row("Timeout SSH (s) :", "timeout", "20") row("Parallelisme :", "parallelism", "3") tk.Label(sf, text="Splunk HEC (logs)", bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2)) self.var_splunk_enabled = tk.BooleanVar( value=self.settings.get("splunk_enabled", "false") == "true") tk.Checkbutton(sf, text="Activer l'envoi vers Splunk", variable=self.var_splunk_enabled, bg=BG, fg=FG, selectcolor=BG2, activebackground=BG, font=("Consolas",10)).pack(anchor="w", padx=20) row("URL HEC :", "splunk_url", "https://splunk.sanef.fr:8088") # Token non stocke dans le JSON — session only f_tok = tk.Frame(sf, bg=BG) f_tok.pack(fill="x", padx=20, pady=4) tk.Label(f_tok, text="Token HEC :", bg=BG, fg=FG, font=("Consolas",10), width=22, anchor="w").pack(side="left") self.var_splunk_token = tk.StringVar(value=self.settings.get("_splunk_token_session", "")) # Session only tk.Entry(f_tok, textvariable=self.var_splunk_token, show="*", bg=BG2, fg=FG, font=("Consolas",10), insertbackground=FG, width=32).pack(side="left", padx=4) tk.Label(f_tok, text="(non stocke)", bg=BG, fg="#6c7086", font=("Consolas",8)).pack(side="left", padx=4) row("Index :", "splunk_index", "main") row("Proxy (optionnel) :", "proxy_url", "http://proxy.sanef.fr:8080") tk.Label(sf, text="Notifications Teams", bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2)) row("Dossier SharePoint :", "sharepoint_notif_path", "") tk.Label(sf, text="Chemin local du dossier SharePoint synchro (ex: C:\\Users\\xxx\\sanefgroupe...\\notifications)", bg=BG, fg="#6c7086", font=("Consolas",7)).pack(anchor="w", padx=20) tk.Label(sf, text="Logs réseau", bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2)) row("Dossier log réseau :", "network_log_path", "") tk.Label(sf, text="Partage réseau pour les logs (ex: \\\\serveur\\share\\logs\\patching)", bg=BG, fg="#6c7086", font=("Consolas",7)).pack(anchor="w", padx=20) # Boutons en bas du scroll bf = tk.Frame(sf, bg=BG) bf.pack(pady=16) tk.Button(bf, text="Sauvegarder", bg="#40a02b", fg="white", font=("Consolas",11,"bold"), padx=12, pady=6, command=self._save).pack(side="left", padx=8) tk.Button(bf, text="Annuler", bg="#313244", fg="#cdd6f4", font=("Consolas",10), padx=12, pady=6, command=self.destroy).pack(side="left", padx=8) def _save(self): keys = ["keyfile","keyfile2","cybr_user","target_user","psmp","ssh_port","timeout","parallelism", "patcher","vs_user","splunk_url","splunk_token","splunk_index","proxy_url", "sharepoint_notif_path","network_log_path"] for k in keys: var = getattr(self, f"var_{k}", None) if var: val = var.get().strip() if k in ("timeout","parallelism","ssh_port"): try: val = int(val) except: val = DEFAULT_SETTINGS.get(k, 22 if k == "ssh_port" else 20) self.settings[k] = val # Splunk enabled (stocke dans JSON — pas sensible) if hasattr(self, "var_splunk_enabled"): self.settings["splunk_enabled"] = "true" if self.var_splunk_enabled.get() else "false" # vCenter pwd et Splunk token : session only, jamais dans le JSON if hasattr(self, "var_vs_pwd"): self.settings["_vs_pwd_session"] = self.var_vs_pwd.get() if hasattr(self, "var_splunk_token"): self.settings["_splunk_token_session"] = self.var_splunk_token.get() self.result = self.settings # Sauvegarder dans JSON SANS les secrets to_save = {k: v for k, v in self.settings.items() if not k.startswith("_")} save_settings(to_save) self.destroy() # ============================================================================== # DIALOGUE CONFIRMATION COMMANDE YUM # ============================================================================== class YumConfirmDialog(tk.Toplevel): def __init__(self, parent, server, cmd): super().__init__(parent) self.title("Confirmation commande patch") self.configure(bg="#1e1e2e") self.result = None # "apply", "apply_all", "cancel" self.cmd_var = tk.StringVar(value=cmd) self._build(server, cmd) center_window(self, 680, 340, parent=parent) self.grab_set() self.wait_window() def _build(self, server, cmd): BG = "#1e1e2e"; FG = "#cdd6f4"; ACCENT = "#89b4fa"; YELLOW = "#f9e2af" tk.Label(self, text="⚠️ CONFIRMATION COMMANDE PATCH", bg="#313244", fg=ACCENT, font=("Consolas",12,"bold"), pady=8).pack(fill="x") tk.Label(self, text=f"Serveur : {server}", bg=BG, fg=YELLOW, font=("Consolas",11)).pack(anchor="w", padx=16, pady=6) tk.Label(self, text="Commande à exécuter :", bg=BG, fg=FG, font=("Consolas",10)).pack(anchor="w", padx=16) cmd_entry = tk.Text(self, height=5, bg="#11111b", fg="#a6e3a1", font=("Consolas",10), wrap="word", insertbackground=FG) cmd_entry.insert("1.0", cmd) cmd_entry.pack(fill="x", padx=16, pady=4) self._cmd_text = cmd_entry bf = tk.Frame(self, bg=BG) bf.pack(pady=12) tk.Button(bf, text="▶ Appliquer", bg="#40a02b", fg="white", font=("Consolas",11,"bold"), padx=10, pady=6, command=lambda: self._set("apply")).pack(side="left", padx=6) tk.Button(bf, text="▶▶ Appliquer à TOUS", bg="#1e66f5", fg="white", font=("Consolas",11,"bold"), padx=10, pady=6, command=lambda: self._set("apply_all")).pack(side="left", padx=6) tk.Button(bf, text="✗ Annuler ce serveur", bg="#d20f39", fg="white", font=("Consolas",10), padx=10, pady=6, command=lambda: self._set("cancel")).pack(side="left", padx=6) def _set(self, val): self.result = val self.final_cmd = self._cmd_text.get("1.0", "end").strip() self.destroy() # ============================================================================== # APPLICATION PRINCIPALE # ============================================================================== class PatchManagerV2: def __init__(self, root, db, current_user): self.root = root self.db = db self.current_user = current_user # {"username":..., "role":...} self.root.title(f"SANEF Linux Patch Manager v{VERSION} — {current_user['username']} ({current_user['role']})") self.root.configure(bg="#1e1e2e") # Plein ecran sur le moniteur ou se trouve la fenetre try: center_window(root, root.winfo_screenwidth(), root.winfo_screenheight(), parent=root) root.after(100, lambda: root.state("zoomed")) except Exception: root.state("zoomed") self.settings = load_settings() self.servers = [] self.results = [] self.running = False self.pkey = None self.pkey2 = None self.cyb_pwd = None self.apply_all_cmd = False self.is_current_week = True # True = semaine courante (patch autorise) self._build_ui() self._reload_keys() self.audit("APP_START", "") # Verifier si des interventions Teams sont restees ouvertes (crash precedent) self.root.after(500, self._teams_check_crash_recovery) # ========================================================================== # COULEURS # ========================================================================== BG = "#1e1e2e" BG2 = "#2a2a3e" FG = "#cdd6f4" ACCENT = "#89b4fa" GREEN = "#a6e3a1" RED = "#f38ba8" YELLOW = "#f9e2af" ORANGE = "#fab387" BTN = "#313244" # ========================================================================== # AUDIT HELPER # ========================================================================== def audit(self, action, details=""): username = self.current_user["username"] self.db.log_action(username, action, details) # Envoi Splunk en arriere-plan (non bloquant) threading.Thread(target=send_to_splunk, args=(self.settings, { "action": action, "user": username, "details": details, "timestamp": datetime.now().isoformat(), }), daemon=True).start() # Log réseau server = details.split("|")[0].strip() if "|" in details else "" threading.Thread(target=self._write_network_log, args=(action, server, details), daemon=True).start() # ========================================================================== # NOTIFICATIONS TEAMS via SharePoint / Power Automate # ========================================================================== # Sous-dossiers SharePoint pour le routing TEAMS_ROUTES = { "dsi_general": "reboot", "fl_prod": "fl_prod", "fl_bst": "fl_bst", "fl_horsprod": "fl_horsprod", "peage": "peage", "lan_delcour": "lan_delcour", } def _get_teams_route(self, server_info, msg_type): """Determine le sous-dossier SharePoint pour le message Teams. msg_type: 'debut', 'fin', 'reboot'""" # Reboot → toujours DSI General if msg_type == "reboot": return "dsi_general" domain = server_info.get("domain", "").lower() env = server_info.get("env", "").lower() name = server_info.get("server", "").lower() responsable = server_info.get("valideur", "").lower() # Responsable Domaine DTS # Flux Libre if "flux libre" in domain: if "bst" in name: return "fl_bst" elif env in ("production",): return "fl_prod" else: return "fl_horsprod" # Peage if "peage" in domain or "péage" in domain: return "peage" # LAN + responsable Laurent DELCOUR if "delcour" in responsable: return "lan_delcour" # Default: pas de notification pour les autres return None def _format_teams_msg(self, server_name, msg_type, intervenant): """Formate le message Teams""" nom = intervenant.capitalize() if intervenant else "SecOps" if msg_type == "debut": return f"[SECOPS] : Intervenant({nom}) => Debut d'intervention sur {server_name}" elif msg_type == "reboot": return f"[SECOPS] : Intervenant({nom}) => Reboot suite MAJ de {server_name}" elif msg_type == "fin": return f"[SECOPS] : Intervenant({nom}) => Fin d'intervention sur {server_name}" elif msg_type == "annulation": return f"[SECOPS] : Intervenant({nom}) => Annulation intervention sur {server_name}" return "" # Fichier d'etat Teams persistant (crash recovery) TEAMS_STATE_FILE = os.path.join(os.path.expanduser("~"), ".patch_manager_teams_state.json") def _teams_save_state(self): """Sauvegarde l'etat des notifications Teams en cours (pour crash recovery)""" pending = [] for s in getattr(self, "servers", []): if s.get("_teams_debut_sent") and not s.get("_teams_fin_sent"): pending.append({ "server": s.get("server", ""), "domain": s.get("domain", ""), "env": s.get("env", ""), "valideur": s.get("valideur", ""), "timestamp": datetime.now().isoformat(), }) try: if pending: with open(self.TEAMS_STATE_FILE, "w") as f: json.dump(pending, f) elif os.path.exists(self.TEAMS_STATE_FILE): os.remove(self.TEAMS_STATE_FILE) except Exception: pass def _teams_check_crash_recovery(self): """Au demarrage, verifie si des interventions sont restees ouvertes (crash/bug)""" if not os.path.exists(self.TEAMS_STATE_FILE): return try: with open(self.TEAMS_STATE_FILE, "r") as f: pending = json.load(f) if not pending: return names = "\n".join(p["server"] for p in pending[:10]) if messagebox.askyesno( "Interventions non terminees", f"L'application a detecte {len(pending)} intervention(s) sans message de fin :\n\n" f"{names}\n\n" f"Envoyer un message d'annulation Teams pour ces serveurs ?", icon="warning"): for p in pending: self._send_teams_notification(p, "annulation") os.remove(self.TEAMS_STATE_FILE) except Exception: try: os.remove(self.TEAMS_STATE_FILE) except Exception: pass def _send_teams_notification(self, server_info, msg_type): """Envoie une notification Teams via fichier SharePoint (non bloquant)""" if hasattr(self, "dryrun_var") and self.dryrun_var.get(): return # Pas de notification en dry run route = self._get_teams_route(server_info, msg_type) if not route: return # Pas de notification pour ce serveur sp_base = self.settings.get("sharepoint_notif_path", "").strip() if not sp_base: return # SharePoint non configure intervenant = self.settings.get("patcher", "").strip() or self.current_user.get("username", "SecOps") message = self._format_teams_msg(server_info.get("server", ""), msg_type, intervenant) if not message: return def _write_notif(): try: # Ecrire le fichier Teams (message seul) notif_dir = os.path.join(sp_base, route) os.makedirs(notif_dir, exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") fname = f"{msg_type}_{server_info.get('server', 'unknown')}_{ts}.txt" fpath = os.path.join(notif_dir, fname) with open(fpath, "w", encoding="utf-8") as f: f.write(message) except Exception: pass # Ecrire dans le log réseau (avec métadonnées) self._write_network_log(msg_type, server_info.get("server", ""), message) threading.Thread(target=_write_notif, daemon=True).start() # Mettre a jour l'etat persistant if msg_type == "debut": self._teams_save_state() elif msg_type in ("fin", "annulation"): self._teams_save_state() def _write_network_log(self, action, server, message=""): """Ecrit une ligne de log sur le partage réseau (avec métadonnées utilisateur)""" log_path = self.settings.get("network_log_path", "").strip() if not log_path: return try: os.makedirs(log_path, exist_ok=True) log_file = os.path.join(log_path, f"patch_manager_{date.today():%Y%m%d}.log") app_user = self.current_user.get("username", "unknown") win_user = os.environ.get("USERNAME", os.environ.get("USER", "unknown")) hostname = socket.gethostname() ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = f"{ts} | {action:12s} | {app_user} | {win_user}@{hostname} | {server} | {message}\n" with open(log_file, "a", encoding="utf-8") as f: f.write(line) except Exception: pass # Ne jamais bloquer le patching # ========================================================================== # BUILD UI # ========================================================================== def _build_ui(self): BG=self.BG; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN # Style ttk style = ttk.Style() style.theme_use("clam") style.configure("TFrame", background=self.BG) style.configure("TLabel", background=self.BG, foreground=self.FG, font=("Consolas",10)) style.configure("Treeview", background=self.BG2, foreground=self.FG, fieldbackground=self.BG2, font=("Consolas",9), rowheight=22) style.configure("Treeview.Heading", background=self.BTN, foreground=self.ACCENT, font=("Consolas",10,"bold")) style.map("Treeview", background=[("selected","#45475a")]) style.configure("TCombobox", fieldbackground=self.BG2, foreground=self.FG, background=self.BG2, font=("Consolas",10)) style.configure("TCheckbutton",background=self.BG, foreground=self.FG, font=("Consolas",10)) # ── Barre titre ──────────────────────────────────────────────────────── tb = tk.Frame(self.root, bg="#181825", pady=6) tb.pack(fill="x") tk.Label(tb, text=f"⚡ SANEF LINUX PATCH MANAGER v{VERSION}", bg="#181825", fg=ACCENT, font=("Consolas",14,"bold")).pack(side="left", padx=12) tk.Label(tb, text=f"Semaine {date.today().isocalendar()[1]} | {date.today():%d/%m/%Y}", bg="#181825", fg="#6c7086", font=("Consolas",10)).pack(side="left", padx=12) tk.Button(tb, text="A propos", bg=BTN, fg="#6c7086", font=("Consolas",9), pady=2, command=self._show_about).pack(side="right", padx=4) tk.Button(tb, text="Mon MDP", bg=BTN, fg="#fab387", font=("Consolas",9), pady=2, command=self._change_my_password).pack(side="right", padx=4) tk.Button(tb, text="Settings", bg=BTN, fg=FG, font=("Consolas",10), pady=2, command=self._open_settings).pack(side="right", padx=12) # ── Notebook (onglets) ───────────────────────────────────────────────── nb_style = ttk.Style() nb_style.configure("TNotebook", background=self.BG, borderwidth=0) nb_style.configure("TNotebook.Tab", background=self.BTN, foreground=self.FG, font=("Consolas",10,"bold"), padding=[12,4]) nb_style.map("TNotebook.Tab", background=[("selected","#313244")], foreground=[("selected",self.ACCENT)]) self.nb = ttk.Notebook(self.root) self.nb.pack(fill="both", expand=True, padx=6, pady=4) # Onglets self.tab_servers = tk.Frame(self.nb, bg=BG) self.tab_patch = tk.Frame(self.nb, bg=BG) self.tab_post = tk.Frame(self.nb, bg=BG) self.tab_report = tk.Frame(self.nb, bg=BG) self.tab_audit = tk.Frame(self.nb, bg=BG) self.nb.add(self.tab_servers, text="1. Selection serveurs") self.nb.add(self.tab_patch, text="2. Patch") self.nb.add(self.tab_post, text="3. Post-patching") self.nb.add(self.tab_report, text="4. Rapport") if self.current_user["role"] == "admin": self.nb.add(self.tab_audit, text="5. Audit") self.tab_users = tk.Frame(self.nb, bg=BG) self.nb.add(self.tab_users, text="6. Utilisateurs") self._build_tab_servers() self._build_tab_patch() self._build_tab_post() self._build_tab_report() if self.current_user["role"] == "admin": self._build_tab_audit() self._build_tab_users() # Viewer = onglets patch/post desactives if self.current_user["role"] == "viewer": self.nb.tab(self.tab_patch, state="disabled") self.nb.tab(self.tab_post, state="disabled") # ========================================================================== # ONGLET 1 — SÉLECTION SERVEURS # ========================================================================== def _build_tab_servers(self): tab = self.tab_servers BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN # ── Ligne 1 : Excel ─────────────────────────────────────────────────── ef = tk.Frame(tab, bg=BG) ef.pack(fill="x", padx=10, pady=6) tk.Label(ef, text="📊 Fichier Excel :", bg=BG, fg=FG, font=("Consolas",10)).pack(side="left") self.excel_var = tk.StringVar(value=self.settings.get("excel_file","")) tk.Entry(ef, textvariable=self.excel_var, bg=BG2, fg=FG, font=("Consolas",10), insertbackground=FG, width=55).pack(side="left", padx=6) tk.Button(ef, text="Parcourir", bg=BTN, fg=FG, font=("Consolas",9), command=self._browse_excel).pack(side="left", padx=2) # Feuille tk.Label(ef, text="Feuille :", bg=BG, fg=FG, font=("Consolas",10)).pack(side="left", padx=(12,2)) self.sheet_var = tk.StringVar() self.sheet_cb = ttk.Combobox(ef, textvariable=self.sheet_var, state="readonly", width=10) self.sheet_cb.pack(side="left", padx=2) self.sheet_cb.bind("<>", lambda e: self._load_sheet()) # Navigation rapide semaines tk.Button(ef, text="◀", bg=BTN, fg=FG, font=("Consolas",10), width=2, command=self._prev_week).pack(side="left", padx=1) tk.Button(ef, text="▶", bg=BTN, fg=FG, font=("Consolas",10), width=2, command=self._next_week).pack(side="left", padx=1) tk.Label(ef, text="S. courante", bg=BG, fg="#6c7086", font=("Consolas",9)).pack(side="left", padx=2) tk.Button(ef, text="↺", bg=BTN, fg=self.ACCENT, font=("Consolas",10), width=2, command=self._goto_current_week).pack(side="left", padx=1) tk.Button(ef, text="Charger", bg="#1e66f5", fg="white", font=("Consolas",10,"bold"), pady=3, command=self._load_excel).pack(side="left", padx=8) # Indicateur semaine courante/visu self.week_mode_lbl = tk.Label(ef, text="", bg=self.BG, font=("Consolas",10,"bold")) self.week_mode_lbl.pack(side="left", padx=8) # ── Ligne 2 : Filtres ───────────────────────────────────────────────── ff = tk.Frame(tab, bg=BG) ff.pack(fill="x", padx=10, pady=4) tk.Label(ff, text="Filtres :", bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(side="left") # Date du jour self.filter_today_var = tk.BooleanVar(value=False) tk.Checkbutton(ff, text="📅 Aujourd'hui uniquement", variable=self.filter_today_var, bg=BG, fg=self.YELLOW, selectcolor=BG2, activebackground=BG, font=("Consolas",10)).pack(side="left", padx=8) # Filtre Intervenant — visible uniquement pour admin self.filter_patcher_var = tk.StringVar(value="Tous") if self.current_user["role"] == "admin": tk.Label(ff, text="Intervenant :", bg=BG, fg=self.YELLOW, font=("Consolas",10,"bold")).pack(side="left", padx=(8,2)) self.filter_patcher_cb = ttk.Combobox(ff, textvariable=self.filter_patcher_var, state="readonly", width=14) self.filter_patcher_cb.pack(side="left", padx=2) self.filter_patcher_cb.bind("<>", lambda e: self._apply_filters()) else: self.filter_patcher_cb = None # Non-admin : pas de filtre intervenant for label, attr in [("Env", "filter_env"), ("Domaine", "filter_domain"), ("Application", "filter_app")]: tk.Label(ff, text=f"{label}:", bg=BG, fg=FG, font=("Consolas",10)).pack(side="left", padx=(8,2)) var = tk.StringVar(value="Tous") setattr(self, f"{attr}_var", var) cb = ttk.Combobox(ff, textvariable=var, state="readonly", width=14) setattr(self, f"{attr}_cb", cb) cb.pack(side="left", padx=2) tk.Button(ff, text="Rechercher", bg=BTN, fg=ACCENT, font=("Consolas",10,"bold"), pady=3, command=self._apply_filters).pack(side="left", padx=6) # ── Ligne 3 : Actions sélection ─────────────────────────────────────── af = tk.Frame(tab, bg=BG) af.pack(fill="x", padx=10, pady=2) for txt, cmd in [("✓ Tout (accordé)", self._select_all_ok), ("✗ Aucun", self._select_none), ("↕ Inverser", self._invert)]: tk.Button(af, text=txt, bg=BTN, fg=FG, font=("Consolas",9), command=cmd).pack(side="left", padx=3) self.count_lbl = tk.Label(af, text="", bg=BG, fg=self.YELLOW, font=("Consolas",9)) self.count_lbl.pack(side="right", padx=10) # ── Treeview serveurs ───────────────────────────────────────────────── tree_frame = tk.Frame(tab, bg=BG) tree_frame.pack(fill="both", expand=True, padx=10, pady=4) cols = ("sel","accord","serveur","intervenant","env","domaine","app","date_patch","snap","statut") self.srv_tree = ttk.Treeview(tree_frame, columns=cols, show="headings", height=22) hdrs = {"sel":"✓","accord":"Accord","serveur":"Serveur", "intervenant":"Intervenant","env":"Env", "domaine":"Domaine","app":"Application","date_patch":"Date patch", "snap":"Snapshot","statut":"Statut"} widths = {"sel":30,"accord":70,"serveur":180,"intervenant":90,"env":80,"domaine":120, "app":150,"date_patch":90,"snap":80,"statut":100} for c in cols: self.srv_tree.heading(c, text=hdrs[c]) self.srv_tree.column(c, width=widths[c], anchor="center" if c in ("sel","accord","snap","statut") else "w") self.srv_tree.tag_configure("no_accord", foreground="#585b70", background="#1e1e2e") self.srv_tree.tag_configure("ok", foreground=self.GREEN) self.srv_tree.tag_configure("warn", foreground=self.YELLOW) self.srv_tree.tag_configure("ko", foreground=self.RED) self.srv_tree.tag_configure("selected", foreground=self.FG, background="#313244") self.srv_tree.tag_configure("normal", foreground=self.FG) self.srv_tree.tag_configure("already_patched",foreground="#1e1e2e", background="#40a02b") vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.srv_tree.yview) hsb = ttk.Scrollbar(tree_frame, orient="horizontal", command=self.srv_tree.xview) self.srv_tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set) self.srv_tree.pack(side="left", fill="both", expand=True) vsb.pack(side="right", fill="y") hsb.pack(side="bottom", fill="x") self.srv_tree.bind("", self._toggle_srv) # ── Boutons prerequis + snapshot ────────────────────────────────────── prereq_frame = tk.Frame(tab, bg=BG) prereq_frame.pack(fill="x", padx=10, pady=4) self.prereq_btn = tk.Button(prereq_frame, text="Verifier prerequis", bg="#e64553", fg="white", font=("Consolas",12,"bold"), pady=6, command=self._check_prerequisites) self.prereq_btn.pack(side="left", padx=4) self.prereq_status = tk.Label(prereq_frame, text="", bg=BG, fg=self.FG, font=("Consolas",10)) self.prereq_status.pack(side="left", padx=12) self.snap_btn = tk.Button(prereq_frame, text="Prendre snapshot vSphere", bg="#313244", fg="#6c7086", font=("Consolas",10), pady=4, state="disabled", command=self._take_snapshots) self.snap_btn.pack(side="left", padx=4) tk.Button(prereq_frame, text="▶ Aller au Patch →", bg="#40a02b", fg="white", font=("Consolas",11,"bold"), pady=5, command=lambda: self.nb.select(self.tab_patch)).pack(side="right", padx=4) # ========================================================================== # ONGLET 2 — PATCH # ========================================================================== def _build_tab_patch(self): tab = self.tab_patch BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN top = tk.Frame(tab, bg=BG) top.pack(fill="both", expand=True, padx=10, pady=8) # Colonne gauche : options (scrollable) left_canvas = tk.Canvas(top, bg=BG, highlightthickness=0, width=380) left_sb = ttk.Scrollbar(top, orient="vertical", command=left_canvas.yview) left = tk.Frame(left_canvas, bg=BG) left.bind("", lambda e: left_canvas.configure(scrollregion=left_canvas.bbox("all"))) left_canvas.create_window((0, 0), window=left, anchor="nw") left_canvas.configure(yscrollcommand=left_sb.set) left_canvas.pack(side="left", fill="y") left_sb.pack(side="left", fill="y") left_canvas.bind_all("", lambda e: left_canvas.yview_scroll(int(-1*(e.delta/120)), "units")) tk.Label(left, text="OPTIONS PATCH", bg=BTN, fg=ACCENT, font=("Consolas",11,"bold"), padx=8, pady=4).pack(fill="x") # Preset packages tk.Label(left, text="Preset packages A PATCHER :", bg=BG, fg=FG, font=("Consolas",10)).pack(anchor="w", pady=(8,2)) self.preset_var = tk.StringVar(value=list(PRESET_PACKAGES.keys())[0]) preset_cb = ttk.Combobox(left, textvariable=self.preset_var, values=list(PRESET_PACKAGES.keys()), state="readonly", width=30) preset_cb.pack(anchor="w") preset_cb.bind("<>", self._on_preset) # Champ packages a patcher tk.Label(left, text="Packages specifiques a PATCHER :", bg=BG, fg=FG, font=("Consolas",10)).pack(anchor="w", pady=(8,2)) self.packages_var = tk.StringVar(value="") self.packages_entry = tk.Entry(left, textvariable=self.packages_var, bg=BG2, fg=FG, font=("Consolas",10), insertbackground=FG, width=32) self.packages_entry.pack(anchor="w") tk.Label(left, text="Ex: gnutls libsoup rhc", bg=BG, fg="#6c7086", font=("Consolas",9)).pack(anchor="w") # Champ packages a exclure tk.Label(left, text="Packages specifiques a EXCLURE :", bg=BG, fg=FG, font=("Consolas",10)).pack(anchor="w", pady=(8,2)) self.custom_excludes_var = tk.StringVar(value="") self.custom_excludes_entry = tk.Entry(left, textvariable=self.custom_excludes_var, bg=BG2, fg=FG, font=("Consolas",10), insertbackground=FG, width=32) self.custom_excludes_entry.pack(anchor="w") tk.Label(left, text="Ex: kernel* podman* docker*", bg=BG, fg="#6c7086", font=("Consolas",9)).pack(anchor="w") # Commande finale editable tk.Label(left, text="Commande finale (editable) :", bg=BG, fg=self.YELLOW, font=("Consolas",10,"bold")).pack(anchor="w", pady=(10,2)) self.final_cmd_var = tk.StringVar(value="") self.final_cmd_entry = tk.Entry(left, textvariable=self.final_cmd_var, bg="#181825", fg=self.YELLOW, font=("Consolas",9), insertbackground=self.YELLOW, width=48) self.final_cmd_entry.pack(anchor="w") tk.Button(left, text="Generer la commande", bg=BTN, fg=FG, font=("Consolas",9), command=self._preview_cmd).pack(anchor="w", pady=(4,0)) # Aide help_frame = tk.Frame(left, bg="#181825", pady=4, padx=8) help_frame.pack(fill="x", pady=(6,0)) tk.Label(help_frame, text="Preset = commande predeterminee", bg="#181825", fg="#a6e3a1", font=("Consolas",8)).pack(anchor="w") tk.Label(help_frame, text="Personnalise = packages a patcher ET/OU a exclure", bg="#181825", fg="#f9e2af", font=("Consolas",8)).pack(anchor="w") tk.Label(help_frame, text="La commande finale est toujours editable avant GO", bg="#181825", fg="#6c7086", font=("Consolas",8)).pack(anchor="w") # Options self.exclude_kernel_var = tk.BooleanVar(value=True) self.dryrun_var = tk.BooleanVar(value=True) tk.Checkbutton(left, text="☑ Exclure kernel (recommandé)", variable=self.exclude_kernel_var, bg=BG, fg=self.YELLOW, selectcolor=BG2, activebackground=BG, font=("Consolas",10,"bold")).pack(anchor="w", pady=(10,2)) tk.Checkbutton(left, text="🔍 Dry Run (simulation)", variable=self.dryrun_var, bg=BG, fg=self.ORANGE, selectcolor=BG2, activebackground=BG, font=("Consolas",10,"bold")).pack(anchor="w", pady=2) # Mot de passe CyberArk tk.Label(left, text="Mot de passe CyberArk (prod) :", bg=BG, fg=FG, font=("Consolas",10)).pack(anchor="w", pady=(12,2)) self.cybpwd_var = tk.StringVar() tk.Entry(left, textvariable=self.cybpwd_var, show="●", bg=BG2, fg=FG, font=("Consolas",10), insertbackground=FG, width=28).pack(anchor="w") # GO button self.go_btn = tk.Button(left, text="🚀 GO PATCH", bg="#d20f39", fg="white", font=("Consolas",16,"bold"), pady=12, padx=20, command=self._go_patch) self.go_btn.pack(fill="x", pady=16) self.stop_btn = tk.Button(left, text="⏹ STOP", bg="#6c7086", fg="white", font=("Consolas",11,"bold"), pady=6, state="disabled", command=self._stop) self.stop_btn.pack(fill="x") # Colonne droite : log temps réel right = tk.Frame(top, bg=BG) right.pack(side="left", fill="both", expand=True) tk.Label(right, text="DÉROULÉ EN TEMPS RÉEL", bg=BTN, fg=ACCENT, font=("Consolas",11,"bold"), pady=4).pack(fill="x") self.patch_log = scrolledtext.ScrolledText( right, height=28, bg="#11111b", fg=FG, font=("Consolas",9), insertbackground=FG, state="disabled") self.patch_log.pack(fill="both", expand=True, pady=4) for tag, color in [("ok",self.GREEN),("ko",self.RED), ("warn",self.YELLOW),("info",ACCENT), ("cmd","#cba6f7")]: self.patch_log.tag_configure(tag, foreground=color) # Barre progression pf = tk.Frame(tab, bg=BG) pf.pack(fill="x", padx=10, pady=4) self.prog_lbl = tk.Label(pf, text="En attente...", bg=BG, fg=FG, font=("Consolas",9)) self.prog_lbl.pack(side="left", padx=4) self.prog_var = tk.DoubleVar() ttk.Progressbar(pf, variable=self.prog_var, maximum=100, length=600).pack(side="left", fill="x", expand=True, padx=4) # ========================================================================== # ONGLET 3 — POST-PATCHING # ========================================================================== def _build_tab_post(self): tab = self.tab_post BG=self.BG; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN tk.Label(tab, text="POST-PATCHING", bg=BTN, fg=ACCENT, font=("Consolas",12,"bold"), pady=6).pack(fill="x", padx=10, pady=8) bf = tk.Frame(tab, bg=BG) bf.pack(fill="x", padx=10, pady=4) tk.Button(bf, text="🔍 Check post-patching", bg="#1e66f5", fg="white", font=("Consolas",11,"bold"), pady=6, command=self._post_patch_check).pack(side="left", padx=4) tk.Button(bf, text="🗑 Supprimer anciens kernels", bg="#e64553", fg="white", font=("Consolas",11,"bold"), pady=6, command=self._remove_old_kernels).pack(side="left", padx=4) tk.Button(bf, text="↩ Undo yum", bg="#fe640b", fg="white", font=("Consolas",11,"bold"), pady=6, command=self._undo_yum).pack(side="left", padx=4) tk.Button(bf, text="🔄 Reboot serveurs sélectionnés", bg="#8839ef", fg="white", font=("Consolas",11,"bold"), pady=6, command=self._reboot_servers).pack(side="left", padx=4) tk.Button(bf, text="🟩 Marquer patchés dans Excel", bg="#40a02b", fg="white", font=("Consolas",11,"bold"), pady=6, command=self._ask_mark_patched).pack(side="left", padx=4) # Deuxième rangée de boutons bf2 = tk.Frame(tab, bg=BG) bf2.pack(fill="x", padx=10, pady=2) tk.Button(bf2, text="🗑 Supprimer snapshots > 3 jours", bg="#e64553", fg="white", font=("Consolas",11,"bold"), pady=6, command=self._delete_old_snapshots).pack(side="left", padx=4) self.post_log = scrolledtext.ScrolledText( tab, bg="#11111b", fg=FG, font=("Consolas",9), insertbackground=FG, state="disabled") self.post_log.pack(fill="both", expand=True, padx=10, pady=4) for tag, color in [("ok",self.GREEN),("ko",self.RED), ("warn",self.YELLOW),("info",ACCENT)]: self.post_log.tag_configure(tag, foreground=color) # ========================================================================== # ONGLET 4 — RAPPORT PATCHING (style dashboard) # ========================================================================== def _build_tab_report(self): tab = self.tab_report BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN # ── Barre titre + boutons ───────────────────────────────────────────── hf = tk.Frame(tab, bg=BTN, pady=6) hf.pack(fill="x", padx=0, pady=0) tk.Label(hf, text="RAPPORT DE PATCHING", bg=BTN, fg=ACCENT, font=("Consolas",12,"bold")).pack(side="left", padx=12) bf = tk.Frame(hf, bg=BTN) bf.pack(side="right", padx=8) tk.Button(bf, text="Generer", bg="#40a02b", fg="white", font=("Consolas",10,"bold"), pady=3, command=self._generate_report).pack(side="left", padx=4) tk.Button(bf, text="CSV", bg=BTN, fg=FG, font=("Consolas",10), pady=3, command=self._export_csv).pack(side="left", padx=4) tk.Button(bf, text="DokuWiki", bg=BTN, fg=FG, font=("Consolas",10), pady=3, command=self._export_dokuwiki).pack(side="left", padx=4) # ── Widgets KPI (ligne de compteurs) ────────────────────────────────── kpi_frame = tk.Frame(tab, bg=BG) kpi_frame.pack(fill="x", padx=10, pady=8) # Créer 6 widgets KPI self.kpi_widgets = {} kpi_defs = [ ("total", "Total traités", "#313244", FG), ("ok", "Connexion OK", "#1a4a2e", self.GREEN), ("patched", "Patchés", "#1a4a2e", self.GREEN), ("uptodate", "Déjà à jour", "#2a2a3e", ACCENT), ("reboot", "Reboot requis", "#4a3000", self.YELLOW), ("ko", "KO / Auth", "#4a1a1a", self.RED), ] for key, label, bg_col, fg_col in kpi_defs: frame = tk.Frame(kpi_frame, bg=bg_col, padx=16, pady=10, relief="flat", bd=0) frame.pack(side="left", fill="x", expand=True, padx=4) # Valeur val_lbl = tk.Label(frame, text="—", bg=bg_col, fg=fg_col, font=("Consolas",24,"bold")) val_lbl.pack() # Label tk.Label(frame, text=label, bg=bg_col, fg=fg_col, font=("Consolas",9)).pack() self.kpi_widgets[key] = val_lbl # ── Tableau détaillé résultats ───────────────────────────────────────── tk.Label(tab, text="DETAIL PAR SERVEUR", bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=12, pady=(4,2)) tree_frame = tk.Frame(tab, bg=BG) tree_frame.pack(fill="both", expand=True, padx=10, pady=4) rcols = ("serveur","env","domaine","accord","snap","connexion","patch","reboot","detail") self.report_tree = ttk.Treeview(tree_frame, columns=rcols, show="headings", height=14) rhdrs = { "serveur": "Serveur", "env": "Env", "domaine": "Domaine", "accord": "Accord", "snap": "Snapshot", "connexion": "Connexion", "patch": "Patch", "reboot": "Reboot", "detail": "Detail packages", } rwidths = { "serveur":200,"env":80,"domaine":100,"accord":70, "snap":80,"connexion":90,"patch":100,"reboot":70,"detail":280 } for c in rcols: self.report_tree.heading(c, text=rhdrs[c]) self.report_tree.column(c, width=rwidths[c], anchor="center" if c not in ("serveur","detail") else "w") self.report_tree.tag_configure("ok", foreground=self.GREEN) self.report_tree.tag_configure("ko", foreground=self.RED) self.report_tree.tag_configure("warn", foreground=self.YELLOW) self.report_tree.tag_configure("patched", foreground=self.GREEN, background="#1a2e1a") self.report_tree.tag_configure("reboot", foreground=self.YELLOW, background="#2e2a00") self.report_tree.tag_configure("uptodate",foreground=ACCENT) vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.report_tree.yview) hsb = ttk.Scrollbar(tree_frame, orient="horizontal", command=self.report_tree.xview) self.report_tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set) self.report_tree.pack(side="left", fill="both", expand=True) vsb.pack(side="right", fill="y") hsb.pack(side="bottom", fill="x") # ── Zone log texte (optionnel, compact) ─────────────────────────────── tk.Label(tab, text="LOG BRUT", bg=BG, fg="#6c7086", font=("Consolas",9)).pack(anchor="w", padx=12, pady=(4,0)) self.report_txt = scrolledtext.ScrolledText( tab, height=6, bg="#11111b", fg=FG, font=("Consolas",8), insertbackground=FG, state="disabled") self.report_txt.pack(fill="x", padx=10, pady=(0,6)) for tag, color in [("ok",self.GREEN),("ko",self.RED), ("warn",self.YELLOW),("info",ACCENT), ("title","#cba6f7")]: self.report_txt.tag_configure(tag, foreground=color, font=("Consolas",9,"bold") if tag=="title" else ("Consolas",8)) # ========================================================================== # HELPERS LOG # ========================================================================== def _log(self, widget, msg, tag="info"): def _do(): widget.configure(state="normal") ts = datetime.now().strftime("%H:%M:%S") widget.insert("end", f"[{ts}] {msg}\n", tag) widget.see("end") widget.configure(state="disabled") self.root.after(0, _do) def _log_patch(self, msg, tag="info"): self._log(self.patch_log, msg, tag) def _log_post(self, msg, tag="info"): self._log(self.post_log, msg, tag) # ========================================================================== # PREREQUIS — SSH + Disque + Satellite + Snapshot # ========================================================================== def _prompt_password(self, title, label): """Prompt mot de passe centre sur le parent. Retourne le mdp ou None.""" dlg = tk.Toplevel(self.root) dlg.title(title) dlg.configure(bg="#1e1e2e") dlg.resizable(False, False) result = [None] tk.Label(dlg, text=label, bg="#1e1e2e", fg="#cdd6f4", font=("Consolas", 11)).pack(padx=20, pady=(15, 5)) pwd_var = tk.StringVar() e = tk.Entry(dlg, textvariable=pwd_var, show="*", bg="#2a2a3e", fg="#cdd6f4", font=("Consolas", 11), insertbackground="#cdd6f4", width=30) e.pack(padx=20, pady=5) e.focus_set() def _ok(event=None): result[0] = pwd_var.get().strip() dlg.destroy() e.bind("", _ok) tk.Button(dlg, text="OK", bg="#40a02b", fg="white", font=("Consolas", 10, "bold"), padx=20, command=_ok).pack(pady=10) center_window(dlg, 400, 140, parent=self.root) dlg.grab_set() dlg.wait_window() return result[0] def _ensure_psmp_pwd(self): """S'assure que le mot de passe PSMP est disponible (session). Retourne True si OK.""" if self.cyb_pwd: return True pwd = self.cybpwd_var.get().strip() if hasattr(self, 'cybpwd_var') else "" if pwd: self.cyb_pwd = pwd return True pwd = self._prompt_password("CyberArk PSMP", f"Mot de passe CyberArk ({self.settings.get('cybr_user', 'CYBP01336')}) :") if pwd: self.cyb_pwd = pwd if hasattr(self, 'cybpwd_var'): self.cybpwd_var.set(pwd) return True return False def _ensure_vcenter_pwd(self): """S'assure que les credentials vCenter sont disponibles (session). Retourne (user, pwd) ou None.""" vs_user = self.settings.get("vs_user", "").strip() vs_pwd = self.settings.get("_vs_pwd_session", "").strip() if vs_user and vs_pwd: return vs_user, vs_pwd if not vs_user: vs_user = self._prompt_password("vCenter", "Utilisateur vCenter :") if not vs_user: return None self.settings["vs_user"] = vs_user if not vs_pwd: vs_pwd = self._prompt_password("vCenter", f"Mot de passe vCenter ({vs_user}) :") if not vs_pwd: return None self.settings["_vs_pwd_session"] = vs_pwd return vs_user, vs_pwd def _check_prerequisites(self): selected = [s for s in self.servers if s.get("selected") and s["accord"] == "oui"] if not selected: messagebox.showwarning("Prerequis", "Aucun serveur selectionne avec accord=OUI") return self._reload_keys() # PSMP si serveurs prod has_prod = any("prod" in s.get("env", "").lower() for s in selected) if has_prod: if not self._ensure_psmp_pwd(): messagebox.showwarning("Prerequis", "Mot de passe PSMP requis pour les serveurs Production") return self.prereq_btn.configure(state="disabled", text="Verification en cours...") self.prereq_status.configure(text="") self.audit("PREREQ_START", f"{len(selected)} serveurs (prod={has_prod})") self._log_patch(f"Prerequis : {len(selected)} serveurs a verifier...", "info") def worker(): results = {} total = len(selected) for idx, s in enumerate(selected): server = s["server"] env = s.get("env", "") r = {"ssh": False, "disk_root": None, "disk_log": None, "satellite": None, "is_vm": server.lower().startswith("v")} self.root.after(0, lambda sv=server, i=idx: self.prereq_status.configure( text=f"[{i+1}/{total}] {sv}")) self._log_patch(f"\n[{idx+1}/{total}] {server} ({env})", "info") # 1. Check SSH self._log_patch(f" SSH...", "info") is_prod = "prod" in env.lower() method = "PSMP" if is_prod else "cle SSH" client, eff = ssh_connect(server, env, self.settings, self.pkey, self.pkey2, self.cyb_pwd) if not client: r["ssh"] = False r["error"] = "Connexion SSH impossible" self._log_patch(f" SSH ({method}) ECHEC — Authentification KO", "ko") # Si PSMP echoue, invalider le mot de passe pour reproposer if is_prod and method == "PSMP": self._log_patch(f" Verifier le mot de passe CyberArk ou le compte {self.settings.get('cybr_user','')}", "ko") results[server] = r continue r["ssh"] = True self._log_patch(f" SSH ({method}) Authentification OK : {eff}", "ok") # 2. Check disque / self._log_patch(f" Disque / ...", "info") out, _ = run_cmd(client, "df -BM / | awk 'NR==2{print $4}' | tr -d 'M'", 10) try: r["disk_root"] = int(out.strip()) except (ValueError, AttributeError): r["disk_root"] = -1 if r["disk_root"] >= 1200: self._log_patch(f" Disque / : {r['disk_root']} Mo libre (OK >= 1200)", "ok") else: self._log_patch(f" Disque / : {r['disk_root']} Mo libre (KO < 1200)", "ko") # Check disque /var/log self._log_patch(f" Disque /var/log ...", "info") out2, _ = run_cmd(client, "df -BM /var/log | awk 'NR==2{print $4}' | tr -d 'M'", 10) try: r["disk_log"] = int(out2.strip()) except (ValueError, AttributeError): r["disk_log"] = -1 if r["disk_log"] >= 500: self._log_patch(f" Disque /var/log : {r['disk_log']} Mo libre (OK >= 500)", "ok") else: self._log_patch(f" Disque /var/log : {r['disk_log']} Mo libre (KO < 500)", "ko") # 3. Check Satellite self._log_patch(f" Satellite ...", "info") sat_out, _ = run_cmd(client, "sudo subscription-manager status 2>/dev/null | head -10", 15) sat_low = sat_out.lower() if sat_out else "" if "current" in sat_low or "valide" in sat_low or "disabled" in sat_low or "content access" in sat_low: r["satellite"] = True mode = "SCA" if "content access" in sat_low or "disabled" in sat_low else "classique" self._log_patch(f" Satellite : OK ({mode})", "ok") elif "unknown" in sat_low or "not registered" in sat_low: r["satellite"] = False self._log_patch(f" Satellite : NON enregistre", "ko") elif sat_out and ("status" in sat_low or "system" in sat_low): r["satellite"] = True self._log_patch(f" Satellite : OK (reponse detectee)", "ok") else: r["satellite"] = None self._log_patch(f" Satellite : N/A (subscription-manager absent)", "warn") client.close() results[server] = r # Afficher les resultats dans l'UI self.root.after(0, lambda: self._show_prereq_results(results, selected)) threading.Thread(target=worker, daemon=True).start() def _show_prereq_results(self, results, selected): self.prereq_btn.configure(state="normal", text="Verifier prerequis") self._log_patch(f"\n{'='*50}", "info") self._log_patch(f"RESUME PREREQUIS", "info") self._log_patch(f"{'='*50}", "info") # Construire le rapport ok_servers = [] ko_servers = [] warn_servers = [] for s in selected: server = s["server"] r = results.get(server, {}) if not r.get("ssh"): ko_servers.append((server, "SSH connexion impossible")) s["prereq_status"] = "SSH_KO" elif r.get("disk_root", 0) >= 0 and r["disk_root"] < 1200: ko_servers.append((server, f"/ : {r['disk_root']} Mo libre (min 1200 Mo)")) s["prereq_status"] = "DISK_KO" elif r.get("disk_log", 0) >= 0 and r["disk_log"] < 500: ko_servers.append((server, f"/var/log : {r['disk_log']} Mo libre (min 500 Mo)")) s["prereq_status"] = "DISK_KO" elif r.get("satellite") is False: ko_servers.append((server, "Satellite : non enregistre")) s["prereq_status"] = "SAT_KO" else: ok_servers.append(server) s["prereq_status"] = "OK" if r.get("satellite") is None: warn_servers.append((server, "subscription-manager absent (Debian/physique ?)")) # Log resume for srv in ok_servers: self._log_patch(f" OK : {srv}", "ok") for srv, reason in warn_servers: self._log_patch(f" WARN : {srv} — {reason}", "warn") for srv, reason in ko_servers: self._log_patch(f" KO : {srv} — {reason}", "ko") self._log_patch(f"\nTotal : {len(ok_servers)} OK, {len(warn_servers)} WARN, {len(ko_servers)} KO", "info") # Dialogue resultat msg = f"Prerequis verifies : {len(selected)} serveurs\n\n" msg += f"OK : {len(ok_servers)}\n" if warn_servers: msg += f"Avertissements : {len(warn_servers)}\n" if ko_servers: msg += f"BLOQUANTS : {len(ko_servers)}\n\n" msg += "Serveurs bloques :\n" for srv, reason in ko_servers: msg += f" {srv} : {reason}\n" msg += "\nDeselectionnez les serveurs bloques pour continuer." if ko_servers: # Deselectionner automatiquement les KO for srv, _ in ko_servers: for s in self.servers: if s["server"] == srv: s["selected"] = False self._apply_filters() messagebox.showwarning("Prerequis — problemes detectes", msg) self.prereq_status.configure(text=f"{len(ko_servers)} serveurs bloques", fg=self.RED) else: messagebox.showinfo("Prerequis OK", msg) self.prereq_status.configure(text=f"{len(ok_servers)} serveurs OK", fg=self.GREEN) # Activer le bouton snapshot pour les VM vm_count = sum(1 for s in selected if s.get("prereq_status") == "OK" and s["server"].lower().startswith("v")) if vm_count > 0: self.snap_btn.configure(state="normal", bg="#e64553", fg="white") self.audit("PREREQ_DONE", f"OK={len(ok_servers)} KO={len(ko_servers)} WARN={len(warn_servers)}") # ========================================================================== # SNAPSHOT — force sans snap # ========================================================================== def _force_patch_without_snap(self, server_name): """Demande une justification pour patcher une VM sans snapshot""" dlg = tk.Toplevel(self.root) dlg.title("Patching sans snapshot") dlg.configure(bg="#1e1e2e") dlg.resizable(False, False) result = [None] tk.Label(dlg, text=f"ATTENTION — VM sans snapshot", bg="#181825", fg="#f38ba8", font=("Consolas", 12, "bold"), pady=8).pack(fill="x") tk.Label(dlg, text=f"Serveur : {server_name}", bg="#1e1e2e", fg="#cdd6f4", font=("Consolas", 11)).pack(pady=(10, 5)) tk.Label(dlg, text="Justification obligatoire :", bg="#1e1e2e", fg="#f9e2af", font=("Consolas", 10)).pack(anchor="w", padx=20) justif_var = tk.StringVar() tk.Entry(dlg, textvariable=justif_var, bg="#2a2a3e", fg="#cdd6f4", font=("Consolas", 10), insertbackground="#cdd6f4", width=50).pack(padx=20, pady=5) def accept(): j = justif_var.get().strip() if len(j) < 5: messagebox.showwarning("Justification", "Minimum 5 caracteres") return result[0] = j dlg.destroy() bf = tk.Frame(dlg, bg="#1e1e2e") bf.pack(pady=10) tk.Button(bf, text="Forcer le patching", bg="#d20f39", fg="white", font=("Consolas", 10, "bold"), command=accept).pack(side="left", padx=8) tk.Button(bf, text="Annuler", bg="#313244", fg="#cdd6f4", font=("Consolas", 10), command=dlg.destroy).pack(side="left", padx=8) center_window(dlg, 500, 220, parent=self.root) dlg.grab_set() dlg.wait_window() if result[0]: self.audit("FORCE_PATCH_NO_SNAP", f"{server_name} : {result[0]}") return result[0] # ========================================================================== # A PROPOS # ========================================================================== def _show_about(self): dlg = tk.Toplevel(self.root) dlg.title("A propos") dlg.configure(bg="#1e1e2e") dlg.resizable(False, False) tk.Frame(dlg, bg="#e94560", height=3).pack(fill="x") tk.Label(dlg, text="SANEF Linux Patch Manager", bg="#1e1e2e", fg="#89b4fa", font=("Consolas", 16, "bold")).pack(pady=(20, 4)) tk.Label(dlg, text=f"Version {VERSION}", bg="#1e1e2e", fg="#cdd6f4", font=("Consolas", 12)).pack() tk.Frame(dlg, bg="#313244", height=1).pack(fill="x", padx=30, pady=12) tk.Label(dlg, text="SANEF DSI — Securite Operationnelle", bg="#1e1e2e", fg="#cdd6f4", font=("Consolas", 11)).pack() tk.Label(dlg, text="(KM)", bg="#1e1e2e", fg="#a6e3a1", font=("Consolas", 11)).pack(pady=(4, 0)) tk.Label(dlg, text="SECOPS 2026", bg="#1e1e2e", fg="#6c7086", font=("Consolas", 10)).pack(pady=(2, 0)) tk.Frame(dlg, bg="#313244", height=1).pack(fill="x", padx=30, pady=12) tk.Label(dlg, text="Orchestration du patching Linux\nvia Excel + SSH + vSphere", bg="#1e1e2e", fg="#6c7086", font=("Consolas", 9), justify="center").pack() tk.Button(dlg, text="Fermer", bg="#313244", fg="#cdd6f4", font=("Consolas", 10), padx=20, pady=4, command=dlg.destroy).pack(pady=16) center_window(dlg, 380, 320, parent=self.root) dlg.grab_set() # ========================================================================== # SETTINGS # ========================================================================== def _open_settings(self): dlg = SettingsDialog(self.root, self.settings) if dlg.result: self.settings = dlg.result self._reload_keys() def _reload_keys(self): self.pkey = load_key(self.settings.get("keyfile","")) self.pkey2 = load_key(self.settings.get("keyfile2","")) # ========================================================================== # ONGLET 1 — LOGIQUE # ========================================================================== def _browse_excel(self): path = filedialog.askopenfilename( filetypes=[("Excel","*.xlsx *.xls"),("All","*.*")]) if path: self.excel_var.set(path) self.settings["excel_file"] = path save_settings(self.settings) self._load_excel() def _prev_week(self): sheets = list(self.sheet_cb["values"]) if not sheets: return cur = self.sheet_var.get() idx = sheets.index(cur) if cur in sheets else 0 if idx > 0: self.sheet_var.set(sheets[idx-1]) self._load_sheet() def _next_week(self): sheets = list(self.sheet_cb["values"]) if not sheets: return cur = self.sheet_var.get() idx = sheets.index(cur) if cur in sheets else 0 if idx < len(sheets)-1: self.sheet_var.set(sheets[idx+1]) self._load_sheet() def _goto_current_week(self): week_num = date.today().isocalendar()[1] sheets = list(self.sheet_cb["values"]) for s in [f"S{week_num}", f"S{week_num:02d}"]: if s in sheets: self.sheet_var.set(s) self._load_sheet() return def _load_excel(self): path = self.excel_var.get().strip() if not path or not os.path.exists(path): messagebox.showerror("Erreur", f"Fichier introuvable :\n{path}") return self.audit("LOAD_EXCEL", path) try: import io try: with open(path, "rb") as f: data = io.BytesIO(f.read()) wb = openpyxl.load_workbook(data, read_only=True, data_only=True) except Exception: wb = openpyxl.load_workbook(path, read_only=True, data_only=True) sheets = wb.sheetnames wb.close() # Mettre à jour le combobox feuilles self.sheet_cb["values"] = sheets week_sheet = None week_num = date.today().isocalendar()[1] for s in sheets: if str(week_num) in s: week_sheet = s break self.sheet_var.set(week_sheet or (sheets[0] if sheets else "")) self._load_sheet() except Exception as e: messagebox.showerror("Erreur Excel", f"Impossible de modifier l'Excel :\n{e}") def _load_sheet(self): path = self.excel_var.get().strip() sheet = self.sheet_var.get() if not path or not sheet: return servers, _ = read_excel_servers(path, sheet) if servers is None: messagebox.showerror("Erreur", "Aucun serveur Linux trouvé") return # Détecter si c'est la semaine courante week_num = date.today().isocalendar()[1] self.is_current_week = any(str(week_num) in sheet for sheet in [sheet]) # Mettre à jour l'indicateur visuel if self.is_current_week: self.week_mode_lbl.configure( text="✅ SEMAINE COURANTE — Patch autorisé", fg=self.GREEN) else: self.week_mode_lbl.configure( text=f"👁 VUE SEULE ({sheet}) — Patch désactivé", fg=self.YELLOW) self.servers = servers # Peupler les filtres envs = sorted(set(s["env"] for s in servers if s["env"])) domains = sorted(set(s["domain"] for s in servers if s["domain"])) apps = sorted(set(s["app"] for s in servers if s["app"])) patchers = sorted(set(s["patcher"].strip() for s in servers if s.get("patcher") and s["patcher"].strip())) self.filter_env_cb["values"] = ["Tous"] + envs self.filter_domain_cb["values"] = ["Tous"] + domains self.filter_app_cb["values"] = ["Tous"] + apps if self.filter_patcher_cb: self.filter_patcher_cb["values"] = ["Tous"] + patchers self.filter_env_var.set("Tous") self.filter_domain_var.set("Tous") self.filter_app_var.set("Tous") # Auto-selectionner le patcheur if self.current_user["role"] == "admin": patcher_setting = self.settings.get("patcher", "").strip() if patcher_setting and patcher_setting in patchers: self.filter_patcher_var.set(patcher_setting) else: self.filter_patcher_var.set("Tous") else: self.filter_patcher_var.set("Tous") # Non-admin filtre par _apply_filters self._apply_filters() # Griser/activer le bouton GO selon la semaine if hasattr(self, "go_btn"): if self.is_current_week: self.go_btn.configure(bg="#d20f39", state="normal", text="🚀 GO PATCH") else: self.go_btn.configure(bg="#45475a", state="normal", text="👁 VUE SEULE — Patch désactivé") # Vérifier snapshots si des serveurs ont accord=oui accord_servers = [s for s in self.servers if s.get("accord") == "oui" and not s.get("is_physical")] if accord_servers and VSPHERE_OK: self.root.after(200, self._prompt_vcenter_and_check_snaps) def _apply_filters(self): env_f = self.filter_env_var.get() domain_f = self.filter_domain_var.get() app_f = self.filter_app_var.get() patcher_f = self.filter_patcher_var.get() today_f = self.filter_today_var.get() today = date.today() # Non-admin : ne voit que ses serveurs (intervenant = son username) is_admin = self.current_user["role"] == "admin" my_name = self.current_user["username"].lower() filtered = [] for s in self.servers: # Filtre par utilisateur connecte (non-admin) if not is_admin: srv_patcher = (s.get("patcher", "") or "").strip().lower() if srv_patcher != my_name: continue if patcher_f != "Tous": srv_patcher = (s.get("patcher", "") or "").strip() patcher_cmp = patcher_f.strip() if srv_patcher and srv_patcher.lower() != patcher_cmp.lower(): continue if env_f != "Tous" and s["env"] != env_f: continue if domain_f != "Tous" and s["domain"] != domain_f: continue if app_f != "Tous" and s["app"] != app_f: continue if today_f and s["date_patch"] and s["date_patch"] != today: continue filtered.append(s) self._populate_tree(filtered) def _populate_tree(self, servers): for item in self.srv_tree.get_children(): self.srv_tree.delete(item) # Compteur pour gérer les doublons de nom de serveur dans la feuille seen = {} for s in servers: accord_ok = s["accord"] == "oui" sel_mark = "✓" if s.get("selected") and accord_ok else ("✗" if accord_ok else "—") accord_lbl= "✅ OUI" if accord_ok else "❌ NON" snap_lbl = "✅ OK" if s.get("snap_done") else ("🖥 PHY" if s.get("is_physical") else "—") date_str = s["date_patch"].strftime("%d/%m/%Y") if s.get("date_patch") else "—" if s.get("already_patched"): tag = "already_patched" elif not accord_ok: tag = "no_accord" elif s.get("selected"): tag = "selected" else: tag = "normal" # Générer un iid unique si le nom de serveur apparaît en double base_iid = s["server"] seen[base_iid] = seen.get(base_iid, 0) + 1 iid = base_iid if seen[base_iid] == 1 else f"{base_iid}_#{seen[base_iid]}" s["_iid"] = iid # stocker pour _refresh_tree_item self.srv_tree.insert("", "end", iid=iid, values=(sel_mark, accord_lbl, s["server"], s.get("patcher",""), s["env"], s["domain"], s["app"], date_str, snap_lbl, s["status"]), tags=(tag,)) n = sum(1 for s in servers if s.get("selected") and s["accord"]=="oui") self.count_lbl.configure(text=f"{len(servers)} serveurs affichés — {n} sélectionnés") def _get_server(self, name): for s in self.servers: if s["server"] == name: return s return None def _get_server_by_iid(self, iid): """Retrouver un serveur par son iid treeview (gère les doublons de noms).""" for s in self.servers: if s.get("_iid") == iid: return s # Fallback : chercher par nom (compatibilité) return self._get_server(iid) def _toggle_srv(self, event): region = self.srv_tree.identify_region(event.x, event.y) if region != "cell": return item = self.srv_tree.identify_row(event.y) if not item: return # Retrouver le serveur par son iid stocké (gère les doublons) s = self._get_server_by_iid(item) if not s or s["accord"] != "oui": return # accord obligatoire s["selected"] = not s.get("selected", False) self._refresh_tree_item(s) n = sum(1 for x in self.servers if x.get("selected") and x["accord"]=="oui") self.count_lbl.configure(text=f"— {n} sélectionnés") def _refresh_tree_item(self, s): # Chercher l'iid : d'abord _iid stocké, sinon nom serveur, sinon chercher dans l'arbre iid = s.get("_iid", s["server"]) if not self.srv_tree.exists(iid): # Fallback : chercher parmi tous les items de l'arbre par valeur "server" for item in self.srv_tree.get_children(): vals = self.srv_tree.item(item, "values") if vals and len(vals) > 2 and vals[2] == s["server"]: iid = item s["_iid"] = iid # mémoriser pour la prochaine fois break else: return # serveur pas dans l'arbre (filtré) accord_ok = s["accord"] == "oui" sel_mark = "✓" if s.get("selected") else "✗" snap_lbl = "✅ OK" if s.get("snap_done") else ("🖥 PHY" if s.get("is_physical") else "—") date_str = s["date_patch"].strftime("%d/%m/%Y") if s.get("date_patch") else "—" tag = "selected" if s.get("selected") else "normal" self.srv_tree.item(iid, values=( sel_mark, "✅ OUI" if accord_ok else "❌ NON", s["server"], s.get("patcher",""), s["env"], s["domain"], s["app"], date_str, snap_lbl, s["status"]), tags=(tag,)) def _select_all_ok(self): for s in self.servers: if s["accord"] == "oui" and self.srv_tree.exists(s["server"]): s["selected"] = True self._refresh_tree_item(s) n = sum(1 for s in self.servers if s.get("selected")) self.count_lbl.configure(text=f"— {n} sélectionnés") def _select_none(self): for s in self.servers: s["selected"] = False if self.srv_tree.exists(s["server"]): self._refresh_tree_item(s) self.count_lbl.configure(text="— 0 sélectionnés") def _invert(self): for s in self.servers: if s["accord"] == "oui" and self.srv_tree.exists(s["server"]): s["selected"] = not s.get("selected", False) self._refresh_tree_item(s) n = sum(1 for s in self.servers if s.get("selected")) self.count_lbl.configure(text=f"— {n} sélectionnés") # ========================================================================== # SNAPSHOT VSPHERE # ========================================================================== def _prompt_vcenter_and_check_snaps(self): """Verifie les snapshots — prompt vCenter si pas encore saisi dans la session.""" accord_servers = [s for s in self.servers if s.get("selected") and s.get("accord") == "oui" and not s.get("is_physical")] if not accord_servers: return creds = self._ensure_vcenter_pwd() if not creds: self._log_patch("Verification snapshot ignoree (pas de credentials vCenter)", "warn") return vs_user, vs_pwd = creds self._log_patch(f"Verification snapshots ({len(accord_servers)} serveurs)...", "info") threading.Thread(target=self._check_snap_worker, args=(accord_servers, vs_user, vs_pwd), daemon=True).start() def _get_snap_identifier(self): """Retourne le nom/identifiant pour le snapshot : patcheur ou username""" patcher = self.settings.get("patcher", "").strip() if patcher: return patcher return self.current_user.get("username", "secops") def _check_snap_worker(self, servers, vs_user, vs_pwd): """Thread : v\u00e9rifie l'existence d'un snapshot r\u00e9cent du patcheur sur les vCenters.""" snap_id = self._get_snap_identifier() vc_errors = [] from datetime import timezone, timedelta def has_recent_snap(vm): """Retourne True si la VM a un snapshot du patcheur de moins d'1 heure""" if not vm.snapshot or not vm.snapshot.rootSnapshotList: return False now = datetime.now(timezone.utc) def walk(snap_list): for snap in snap_list: # Snap du patcheur (nom contient l'identifiant) if snap.name.startswith("SLPM_") and snap_id.lower() in snap.name.lower(): # Verifier age < 1 heure try: snap_time = snap.createTime if snap_time.tzinfo is None: snap_time = snap_time.replace(tzinfo=timezone.utc) age = (now - snap_time).total_seconds() if age < 3600: return True except Exception: return True # En cas de doute, considerer comme existant if snap.childSnapshotList and walk(snap.childSnapshotList): return True return False return walk(vm.snapshot.rootSnapshotList) # Connexion a tous les vCenters une seule fois import ssl vc_connections = {} # vc_host -> (si, content) for vc in VSPHERE_HOSTS: try: ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE si = SmartConnect(host=vc, user=vs_user, pwd=vs_pwd, sslContext=ctx) vc_connections[vc] = (si, si.RetrieveContent()) self._log_patch(f" vCenter {vc} : connecte", "ok") except Exception as e: err_str = str(e) if "10060" in err_str or "timed out" in err_str.lower(): self._log_patch(f" vCenter {vc} : INJOIGNABLE (timeout)", "ko") elif "10061" in err_str or "refused" in err_str.lower(): self._log_patch(f" vCenter {vc} : CONNEXION REFUSEE", "ko") elif "auth" in err_str.lower() or "login" in err_str.lower() or "401" in err_str: self._log_patch(f" vCenter {vc} : AUTHENTIFICATION ECHOUEE", "ko") else: self._log_patch(f" vCenter {vc} : ERREUR — {err_str[:80]}", "ko") vc_errors.append(vc) # Pour chaque serveur, chercher dans l'ordre prioritaire de ses vCenters for s in servers: if s.get("snap_done"): continue vc_order = get_vcenter_order_for_server(s["server"]) for vc in vc_order: if vc not in vc_connections: continue _, content = vc_connections[vc] vm = self._find_vm(content, s["server"]) if not vm: continue if has_recent_snap(vm): s["snap_done"] = True self._log_patch(f" ✅ Snapshot existant : {s['server']} ({vc.split('.')[0]})", "ok") self.root.after(0, lambda sv=s: self._refresh_tree_item(sv)) else: self._log_patch(f" ⚠ Pas de snapshot : {s['server']} ({vc.split('.')[0]})", "warn") break # VM trouvee sur ce vCenter, pas besoin de chercher les autres # Deconnexion for vc, (si, _) in vc_connections.items(): try: Disconnect(si) except: pass # Bilan ok = sum(1 for s in servers if s.get("snap_done")) ko = len(servers) - ok not_found = [s["server"] for s in servers if not s.get("snap_done")] if vc_errors: self._log_patch(f"vCenter injoignables : {', '.join(vc_errors)} — les VMs de ces vCenter n'ont pas pu etre verifiees", "ko") if not_found: self._log_patch(f"VMs sans snapshot : {', '.join(not_found[:10])}", "warn") self._log_patch( f"Bilan snapshots : {ok} OK, {ko} manquant(s) sur {len(servers)} serveur(s)", "ok" if ko == 0 else "warn") def _center_dialog(self, dlg, w, h): """Centre un dialog sur le meme ecran que la fenetre principale.""" center_window(dlg, w, h, parent=self.root) def _take_snapshots(self): selected = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"] if not selected: messagebox.showwarning("Attention", "Aucun serveur sélectionné (avec accord).") return if not VSPHERE_OK: messagebox.showwarning("pyVmomi manquant", "pyVmomi non installé.\n\n" "Installer avec :\npy -m pip install pyVmomi --proxy http://proxy.sanef.fr:8080\n\n" "Après installation, relancer l'application.") return creds = self._ensure_vcenter_pwd() if not creds: self._log_patch("Snapshot annule (pas de credentials vCenter)", "warn") return vs_user, vs_pwd = creds self._log_patch(f"Creation snapshots ({len(selected)} serveurs)...", "info") threading.Thread(target=self._snap_worker, args=(selected, vs_user, vs_pwd), daemon=True).start() def _snap_worker(self, servers, vs_user, vs_pwd): snap_id = self._get_snap_identifier() snap_name = f"SLPM_{snap_id}_{datetime.now():%Y%m%d_%H%M}" from datetime import timezone as _tz def has_recent_snap(vm): if not vm.snapshot or not vm.snapshot.rootSnapshotList: return False now = datetime.now(_tz.utc) def walk(snap_list): for snap in snap_list: if snap.name.startswith("SLPM_") and snap_id.lower() in snap.name.lower(): try: st = snap.createTime if st.tzinfo is None: st = st.replace(tzinfo=_tz.utc) if (now - st).total_seconds() < 3600: return True except Exception: return True if snap.childSnapshotList and walk(snap.childSnapshotList): return True return False return walk(vm.snapshot.rootSnapshotList) # Connexion a tous les vCenters une seule fois import ssl vc_connections = {} for vc in VSPHERE_HOSTS: self._log_patch(f"Connexion vCenter : {vc}", "info") try: ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE si = SmartConnect(host=vc, user=vs_user, pwd=vs_pwd, sslContext=ctx) vc_connections[vc] = (si, si.RetrieveContent()) self._log_patch(f" vCenter {vc} : connecte", "ok") except Exception as e: err_str = str(e) if "10060" in err_str or "timed out" in err_str.lower(): self._log_patch(f" vCenter {vc} : INJOIGNABLE (timeout)", "ko") elif "10061" in err_str or "refused" in err_str.lower(): self._log_patch(f" vCenter {vc} : CONNEXION REFUSEE", "ko") elif "auth" in err_str.lower() or "login" in err_str.lower(): self._log_patch(f" vCenter {vc} : AUTHENTIFICATION ECHOUEE", "ko") else: self._log_patch(f" vCenter {vc} : ERREUR — {err_str[:80]}", "ko") # Pour chaque serveur, chercher/creer le snapshot dans l'ordre prioritaire for s in servers: if s.get("snap_done"): continue vc_order = get_vcenter_order_for_server(s["server"]) for vc in vc_order: if vc not in vc_connections: continue _, content = vc_connections[vc] vm = self._find_vm(content, s["server"]) if not vm: continue # VM trouvee sur ce vCenter if has_recent_snap(vm): s["snap_done"] = True self._log_patch(f" Snapshot recent existant : {s['server']} ({vc.split('.')[0]})", "ok") self.root.after(0, lambda sv=s: self._refresh_tree_item(sv)) else: try: task = vm.CreateSnapshot_Task( name=snap_name, description=f"Pre-patch automatique {date.today()}", memory=False, quiesce=False) while task.info.state not in ("success","error"): time.sleep(1) if task.info.state == "success": s["snap_done"] = True self._log_patch(f" Snapshot OK : {s['server']} ({vc.split('.')[0]})", "ok") self.audit("SNAP_CREATE", f"{s['server']} | {snap_name} | OK") self.root.after(0, lambda sv=s: self._refresh_tree_item(sv)) else: self._log_patch(f" Snapshot KO : {s['server']} ({task.info.error})", "ko") self.audit("SNAP_CREATE", f"{s['server']} | {snap_name} | ECHEC") except Exception as e: self._log_patch(f" {s['server']} : {e}", "ko") break # VM trouvee, pas besoin de chercher les autres vCenters # Deconnexion for vc, (si, _) in vc_connections.items(): try: Disconnect(si) except: pass # Bilan final ok = [s for s in servers if s.get("snap_done")] ko = [s for s in servers if not s.get("snap_done") and not s.get("is_physical")] for s in ko: self._log_patch(f" VM non trouvee sur aucun vCenter : {s['server']}", "warn") self._log_patch( f"Snapshots terminés : {len(ok)}/{len(servers)} OK", "ok" if not ko else "warn") # Rafraîchir l'arbre complet à la fin et basculer vers l'onglet Patch def _finish_snap(): for sv in servers: self._refresh_tree_item(sv) self.nb.select(self.tab_patch) self.root.after(0, _finish_snap) def _find_vm(self, content, name): """Cherche une VM par nom exact ou partiel dans tout le vCenter""" name_short = name.split(".")[0].lower() # Sans le domaine def search_folder(folder): try: for obj in folder.childEntity: if hasattr(obj, "childEntity"): result = search_folder(obj) if result: return result if hasattr(obj, "name"): vm_name = obj.name.lower() if vm_name == name.lower() or vm_name == name_short: return obj if name_short in vm_name or vm_name in name_short: return obj except Exception: pass return None try: for dc in content.rootFolder.childEntity: result = search_folder(dc.vmFolder) if result: return result except Exception: pass return None # ========================================================================== # ONGLET 2 — LOGIQUE PATCH # ========================================================================== def _on_preset(self, event=None): val = PRESET_PACKAGES.get(self.preset_var.get(), "") if val == "CUSTOM": # Mode personnalise — activer les champs self.packages_entry.configure(state="normal") self.custom_excludes_entry.configure(state="normal") self.packages_var.set("") self.custom_excludes_var.set("") else: self.packages_var.set(val) self.custom_excludes_var.set("") self._preview_cmd() def _preview_cmd(self): """Genere la commande yum et l'affiche dans le champ editable""" preset = self.preset_var.get() packages = self.packages_var.get().strip() custom_excl = self.custom_excludes_var.get().strip() exclude_kernel = self.exclude_kernel_var.get() dryrun = self.dryrun_var.get() if preset == "Personnalise..." or custom_excl: # Mode personnalise if packages and not custom_excl: # Packages specifiques a patcher uniquement if dryrun: cmd = f"sudo yum check-update {packages} 2>/dev/null | head -30" else: cmd = f"sudo yum update -y {packages} 2>&1 | tail -25" elif custom_excl: # Excludes personnalises excl_parts = " ".join(f"--exclude={e.strip()}" for e in custom_excl.split() if e.strip()) if exclude_kernel and "--exclude=kernel" not in excl_parts.lower(): excl_parts += " --exclude=*kernel*" if packages: if dryrun: cmd = f"sudo yum check-update {packages} {excl_parts} 2>/dev/null | head -30" else: cmd = f"sudo yum update -y {packages} {excl_parts} 2>&1 | tail -25" else: if dryrun: cmd = f"sudo yum check-update {excl_parts} 2>/dev/null | head -30" else: cmd = f"sudo yum update -y {excl_parts} 2>&1 | tail -25" else: cmd = "sudo yum check-update 2>/dev/null | head -30" if dryrun else "sudo yum update -y 2>&1 | tail -25" else: # Mode preset standard — utilise build_yum_command cmd = build_yum_command("standard", packages, exclude_kernel, dryrun) cmd = "sudo " + cmd if not cmd.startswith("sudo") else cmd self.final_cmd_var.set(cmd) def _go_patch(self): # Bloquer si pas la semaine courante if not self.is_current_week: messagebox.showwarning( "Patch desactive", "Vous consultez une autre semaine que la semaine courante.\n\n" "Le patch n'est autorise que sur la semaine en cours.\n" "Utilisez le bouton ↺ pour revenir a la semaine courante.") return selected = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"] if not selected: messagebox.showwarning("Attention", "Aucun serveur sélectionné avec accord.") return # Vérifier snapshot pour les VMs no_snap = [s for s in selected if not s.get("snap_done") and not s.get("is_physical")] if no_snap: names = "\n".join(s["server"] for s in no_snap[:8]) ans = messagebox.askyesno( "⚠️ Snapshot manquant", f"{len(no_snap)} serveur(s) sans snapshot :\n{names}\n\n" "Continuer SANS snapshot ?", icon="warning") if not ans: return if not self.dryrun_var.get(): if not messagebox.askyesno( "Confirmation PATCH RÉEL", f"Lancer le PATCH RÉEL sur {len(selected)} serveur(s) ?\n\n" "Cette action est irréversible !", icon="warning"): return self.cyb_pwd = self.cybpwd_var.get().strip() or None self.running = True self.apply_all_cmd = False # Reprise apres STOP : garder les statuts et results # Nouveau lancement : tout remettre a zero if getattr(self, "_stopped", False): self._stopped = False else: self.results = [] for s in selected: if s.get("status") not in ("✅ DÉJÀ PATCHÉ",): s["status"] = "—" s["patch_status"] = "—" s["patch_detail"] = "" s["reboot_required"] = False s.pop("_teams_debut_sent", None) s.pop("_teams_fin_sent", None) s.pop("dep_excluded", None) self.go_btn.configure(state="disabled") self.stop_btn.configure(state="normal") threading.Thread( target=self._patch_worker, args=(selected,), daemon=True).start() def _stop(self): self.running = False self._stopped = True # Distinguer STOP (reprise) vs fin normale self._log_patch("⏹ Arrêt demandé...", "warn") # Envoyer annulation Teams pour les serveurs en cours (debut envoyé mais pas fin) for s in self.servers: if s.get("_teams_debut_sent") and not s.get("_teams_fin_sent"): self._send_teams_notification(s, "annulation") s["_teams_fin_sent"] = True # Marquer comme terminé pour éviter doublon @staticmethod def _rpm_name(full_nevra): """Extrait le nom du paquet depuis un NEVRA complet. Ex: runc-1.1.12-4.el8.x86_64 → runc container-selinux-2:2.229.0-2.el8.noarch → container-selinux""" # Retirer l'arch (.x86_64, .noarch, .i686) for arch in (".x86_64", ".noarch", ".i686", ".aarch64", ".i386", ".src"): if full_nevra.endswith(arch): full_nevra = full_nevra[:-len(arch)] break # Retirer version-release : derniers 2 segments apres "-" parts = full_nevra.rsplit("-", 2) if len(parts) == 3 and (any(c.isdigit() for c in parts[1]) or ":" in parts[1]): return parts[0] if len(parts) >= 2 and any(c.isdigit() for c in parts[-1]): return "-".join(parts[:-1]) return full_nevra def _detect_dep_errors(self, yum_output): """Analyse la sortie yum pour detecter les erreurs de dependances. Retourne (has_error, dep_packages_to_exclude, error_detail)""" import re has_error = False pkgs_to_exclude = set() details = [] lines = yum_output.splitlines() in_error_block = False for i, line in enumerate(lines): low = line.lower().strip() # Detecter le bloc d'erreur if low.startswith("error:") or low.startswith("problem:"): has_error = True in_error_block = True if not in_error_block: continue # "package xxx is filtered out by exclude filtering" # → le paquet exclu empeche une dependance if "is filtered out by exclude" in low or "is excluded" in low: m = re.search(r'package\s+(\S+)', line, re.I) if m: details.append(line.strip()) # "package xxx requires yyy, but none of the providers can be installed" # → xxx ne peut pas etre installe, il faut l'exclure if "requires" in low and ("none of the providers" in low or "excluded" in low or "filtered" in low): m = re.search(r'package[:\s]+(\S+)', line, re.I) if m: pkg = self._rpm_name(m.group(1)) pkgs_to_exclude.add(pkg) details.append(f"{pkg} : dependance bloquee") # "nothing provides xxx needed by yyy" # → yyy ne peut pas etre installe if "nothing provides" in low and "needed by" in low: m = re.search(r'needed by\s+(\S+)', line, re.I) if m: pkg = self._rpm_name(m.group(1)) pkgs_to_exclude.add(pkg) details.append(f"{pkg} : dependance manquante") # "Problem: package xxx-version requires yyy" if low.startswith("problem:") or low.startswith("- package"): m = re.search(r'package\s+(\S+)\s+requires', line, re.I) if m: pkg = self._rpm_name(m.group(1)) pkgs_to_exclude.add(pkg) # "(try to add '--skip-broken' to skip uninstallable packages)" if "skip-broken" in low: has_error = True # Bloc vide = fin du bloc erreur if low == "" and in_error_block: in_error_block = False return has_error, list(pkgs_to_exclude), details def _patch_worker(self, servers): total = len(servers) packages = self.packages_var.get().strip() exclude_kernel = self.exclude_kernel_var.get() dryrun = self.dryrun_var.get() approved_cmd = None # commande approuvée pour "appliquer à tous" for idx, s in enumerate(servers): if not self.running: break server = s["server"] # Skip serveurs deja traites (reprise apres STOP) if s.get("status") in ("OK", "IGNORE"): self._log_patch(f"\n[{idx+1}/{total}] ── {server} ── (deja traite, skip)", "info") self._update_progress(idx+1, total) continue self._log_patch(f"\n[{idx+1}/{total}] ── {server} ──", "info") # Construire la commande yum yum_cmd = build_yum_command(s["domain"], packages, exclude_kernel, dryrun, server) # Confirmation commande (sauf si "appliquer à tous" déjà validé) if not self.apply_all_cmd: event = threading.Event() dialog_result = [None] def show_dialog(cmd=yum_cmd, srv=server, ev=event, res=dialog_result): dlg = YumConfirmDialog(self.root, srv, cmd) res[0] = dlg.result if dlg.result in ("apply","apply_all"): yum_cmd_final = dlg.final_cmd res.append(yum_cmd_final) ev.set() self.root.after(0, show_dialog) event.wait() action = dialog_result[0] if action == "cancel": self._log_patch(f" ↩ Serveur ignoré par l'utilisateur", "warn") # Si debut Teams deja envoye (reprise), envoyer annulation if s.get("_teams_debut_sent") and not s.get("_teams_fin_sent"): self._send_teams_notification(s, "annulation") s["_teams_fin_sent"] = True s["status"] = "IGNORE" self._update_tree_status(s) continue elif action == "apply_all": self.apply_all_cmd = True if len(dialog_result) > 1: yum_cmd = dialog_result[1] approved_cmd = yum_cmd elif action == "apply": if len(dialog_result) > 1: yum_cmd = dialog_result[1] else: if approved_cmd: yum_cmd = approved_cmd self._log_patch(f" Connexion...", "info") t0 = time.time() client, eff_host = ssh_connect( server, s["env"], self.settings, self.pkey, self.pkey2, self.cyb_pwd) if client is None: dur = round(time.time() - t0, 1) self._log_patch(f" ❌ Connexion impossible ({dur}s)", "ko") # Si debut Teams envoye (reprise apres crash), envoyer annulation if s.get("_teams_debut_sent") and not s.get("_teams_fin_sent"): self._send_teams_notification(s, "annulation") s["_teams_fin_sent"] = True s["status"] = "AUTH_KO" self.results.append({**s, "duration": f"{dur}s", "patch_status":"KO","patch_detail":"Connexion impossible"}) self._update_tree_status(s) self._update_progress(idx+1, total) continue self._log_patch(f" ✅ Connecté : {eff_host}", "ok") # Détection physique s["is_physical"] = detect_physical(client) if s["is_physical"]: self._log_patch(f" 🖥 Serveur physique détecté", "warn") # Pré-patching self._log_patch(f" Pre-patching...", "info") pre_out, _ = run_cmd(client, PRE_PATCH_SCRIPT, 30) if "PRE_PATCH_OK" in pre_out: self._log_patch(f" Pre-patch OK", "ok") else: self._log_patch(f" Pre-patch incomplet", "warn") # Flux Libre Podman — snapshot pods (pas les BST) is_fl = "flux libre" in s.get("domain", "").lower() and _is_fl_podman(server) if is_fl: self._log_patch(f" Flux Libre : snapshot pods Podman...", "info") fl_out, _ = run_cmd(client, FL_PRE_PATCH_SCRIPT, 30) if "FL_PRE_OK" in fl_out: for line in fl_out.splitlines(): if line.startswith("FL_USER=") or line.startswith("FL_PODS_SAVED="): self._log_patch(f" {line}", "ok") elif "FL_NO_PODS" in fl_out: self._log_patch(f" Pas de pods Podman detectes", "info") else: self._log_patch(f" Snapshot pods incomplet", "warn") # Snapshot versions avant ver_before = {} if packages: for pkg in packages.split(): out, _ = run_cmd(client, f"rpm -q {pkg} 2>/dev/null || echo NOT_INSTALLED", 15) ver_before[pkg] = out.strip() # Detecter le gestionnaire de paquets distant (yum/dnf vs apt) pkg_mgr_out, _ = run_cmd(client, "command -v apt 2>/dev/null && echo APT || (command -v dnf 2>/dev/null && echo DNF || echo YUM)", 5) is_apt = "APT" in pkg_mgr_out # Si apt detecte et commande yum → adapter automatiquement if is_apt and ("yum " in yum_cmd or "dnf " in yum_cmd): self._log_patch(f" Systeme Debian/Ubuntu detecte → adaptation apt", "info") if dryrun: yum_cmd = "sudo apt update 2>&1 && apt list --upgradable 2>/dev/null" else: yum_cmd = "sudo apt update 2>&1 && sudo DEBIAN_FRONTEND=noninteractive apt upgrade -y 2>&1" elif is_apt and "apt " in yum_cmd: # Commande apt personnalisee — juste ajouter DEBIAN_FRONTEND et nettoyer les pipes yum if "DEBIAN_FRONTEND" not in yum_cmd: yum_cmd = yum_cmd.replace("sudo apt", "sudo DEBIAN_FRONTEND=noninteractive apt") # Supprimer les filtres grep yum qui cassent la sortie apt if "| grep" in yum_cmd: yum_cmd = yum_cmd.split("2>&1")[0] + "2>&1" if "2>&1" in yum_cmd else yum_cmd.split("2>/dev/null")[0] + "2>&1" # Notification Teams : debut d'intervention (une seule fois par serveur) if not s.get("_teams_debut_sent"): self._send_teams_notification(s, "debut") s["_teams_debut_sent"] = True self._log_patch(f" Commande : {yum_cmd[:120]}...", "cmd") self.audit("PATCH_CMD", f"{server} | {yum_cmd[:120]}") # Callback pour affichage temps reel def _on_yum_line(line, _s=server): self._log_patch(f" {line}", "info") yum_out, yum_err = run_cmd_stream(client, yum_cmd, 300, on_line=_on_yum_line) yum_full = yum_out + "\n" + yum_err # ── Gestion erreurs yum (retry intelligent) ── extra_excludes = "" all_excluded_pkgs = [] gpg_retried = False if not dryrun: for retry in range(4): yum_low = yum_full.lower() # 1) Erreur GPG → retry avec --nogpgcheck if not gpg_retried and ("gpg check failed" in yum_low or "gpg keys" in yum_low and "incorrect" in yum_low): gpg_retried = True self._log_patch(f" ⚠ Erreur GPG detectee → retry avec --nogpgcheck", "warn") yum_cmd_gpg = yum_cmd.replace("yum update -y", "yum update -y --nogpgcheck") self._log_patch(f" Commande : {yum_cmd_gpg[:120]}...", "cmd") yum_out, yum_err = run_cmd_stream(client, yum_cmd_gpg, 300, on_line=_on_yum_line) yum_full = yum_out + "\n" + yum_err yum_cmd = yum_cmd_gpg continue # 2) Erreur dependances → detecter paquets bloques et les exclure has_dep_err, dep_pkgs, dep_details = self._detect_dep_errors(yum_full) if not has_dep_err or not dep_pkgs: break # Eviter boucle infinie (memes paquets) new_pkgs = [p for p in dep_pkgs if p not in all_excluded_pkgs] if not new_pkgs: break all_excluded_pkgs.extend(new_pkgs) for d in dep_details: self._log_patch(f" ⚠ {d}", "warn") new_excludes = " ".join(f"--exclude=*{p}*" for p in new_pkgs) extra_excludes += " " + new_excludes self._log_patch(f" ↻ Paquets bloques exclus : {', '.join(new_pkgs)}", "warn") yum_cmd = build_yum_command(s["domain"], packages, exclude_kernel, dryrun, server, extra_excludes) if gpg_retried: yum_cmd = yum_cmd.replace("yum update -y", "yum update -y --nogpgcheck") self._log_patch(f" Commande : {yum_cmd[:120]}...", "cmd") yum_out, yum_err = run_cmd_stream(client, yum_cmd, 300, on_line=_on_yum_line) yum_full = yum_out + "\n" + yum_err if all_excluded_pkgs: self._log_patch(f" Paquets exclus (dependances) : {', '.join(set(all_excluded_pkgs))}", "warn") s["dep_excluded"] = list(set(all_excluded_pkgs)) if gpg_retried: self._log_patch(f" --nogpgcheck utilise", "warn") # Snapshot versions après ver_after = {} if packages: for pkg in packages.split(): out, _ = run_cmd(client, f"rpm -q {pkg} 2>/dev/null || echo NOT_INSTALLED", 15) ver_after[pkg] = out.strip() dur = round(time.time() - t0, 1) # Detecter erreur residuelle (apres retries) final_err, final_dep_pkgs, _ = self._detect_dep_errors(yum_full) has_yum_error = final_err and final_dep_pkgs # Erreurs fatales non recuperables yum_low = yum_full.lower() if "gpg check failed" in yum_low and not gpg_retried: has_yum_error = True self._log_patch(f" ❌ Erreur GPG non resolue", "ko") if "timeout" in yum_low and ("mirror" in yum_low or "metadata" in yum_low): has_yum_error = True self._log_patch(f" ❌ Timeout repo/mirror", "ko") if "cannot prepare internal mirrorlist" in yum_low: has_yum_error = True self._log_patch(f" ❌ Erreur mirrorlist", "ko") # Déterminer statut if dryrun: pkg_lines = [l for l in yum_out.splitlines() if l.split() and "." in l.split()[0] and not any(x in l for x in ["kB","bps","00:","Updat"])] if pkg_lines: s["patch_status"] = "UPDATE_AVAIL" s["patch_detail"] = " | ".join(pkg_lines)[:200] self._log_patch(f" ⚠ Updates disponibles : {len(pkg_lines)} package(s)", "warn") else: s["patch_status"] = "UP_TO_DATE" s["patch_detail"] = "Deja a jour" self._log_patch(f" ✅ A jour", "ok") elif has_yum_error and "Complete!" not in yum_out: # Erreur yum non resolue excluded_str = ", ".join(s.get("dep_excluded", [])) or ", ".join(final_dep_pkgs) s["patch_status"] = "DEP_ERROR" s["patch_detail"] = f"Erreur dependances : {excluded_str}"[:200] self._log_patch(f" ❌ ERREUR dependances : {excluded_str}", "ko") self.audit("PATCH_ERROR", f"{server} | DEP_ERROR | {excluded_str[:100]}") else: updated = [p for p in (packages.split() if packages else []) if ver_before.get(p) != ver_after.get(p)] # Compter les paquets mis a jour (RHEL + Debian) import re pkg_count = 0 pkg_names = [] for line in yum_full.splitlines(): # RHEL: "Updated:" ou "Upgraded:" ou "Installed:" dans le résumé # Ligne de paquet yum: " openssl.x86_64 1:1.1.1k-14.el8_10" m_yum = re.match(r'^\s{2,}(\S+\.\S+)\s+\S+', line) if m_yum and not any(x in line for x in ["kB","bps","00:","http","Updat","Load"]): pkg_names.append(m_yum.group(1).split(".")[0]) # Debian: "XX mis à jour, XX nouvellement installés" m_apt = re.search(r'(\d+)\s+mis\s+[àa]\s+jour', line) if m_apt: pkg_count = max(pkg_count, int(m_apt.group(1))) # Debian EN: "XX upgraded" m_apt_en = re.search(r'(\d+)\s+upgraded', line) if m_apt_en: pkg_count = max(pkg_count, int(m_apt_en.group(1))) # Debian: lignes "Inst paquet version" m_inst = re.match(r'^Inst\s+(\S+)', line) if m_inst: pkg_names.append(m_inst.group(1)) # Debian: "Préparation du dépaquetage de .../paquet_version" m_deb = re.match(r'^D[eé]paquetage de\s+(\S+)', line) if m_deb: pkg_names.append(m_deb.group(1)) # Detecter si patché is_patched = ( updated or "Complete!" in yum_out # RHEL yum or "Complet" in yum_out # RHEL dnf FR or pkg_count > 0 # Debian "N mis à jour" or len(pkg_names) > 0 # Paquets detectes ) # Detecter si rien a faire is_uptodate = ( "Nothing to do" in yum_out or "No packages" in yum_out or "0 mis à jour" in yum_full or "0 upgraded" in yum_full or "Rien à faire" in yum_full ) if is_patched and not is_uptodate: s["patch_status"] = "PATCHED" if updated: detail = " | ".join( f"{p}: {ver_before.get(p,'?')} -> {ver_after.get(p,'?')}" for p in updated)[:200] elif pkg_names: unique_pkgs = list(dict.fromkeys(pkg_names)) # dedup preserving order n = len(unique_pkgs) detail = f"{n} paquet(s) : {', '.join(unique_pkgs[:10])}" if n > 10: detail += f" ... +{n-10}" elif pkg_count > 0: detail = f"{pkg_count} paquet(s) mis a jour" else: detail = "Packages mis a jour" if s.get("dep_excluded"): detail += f" (exclus: {', '.join(s['dep_excluded'])})" s["patch_detail"] = detail[:200] self._log_patch(f" ✅ PATCHE : {s['patch_detail'][:120]}", "ok") self.audit("PATCH_DONE", f"{server} | PATCHE | {s['patch_detail'][:100]}") elif is_uptodate: s["patch_status"] = "UP_TO_DATE" s["patch_detail"] = "Deja a jour" self._log_patch(f" ✅ Deja a jour", "ok") self.audit("PATCH_DONE", f"{server} | DEJA A JOUR") else: s["patch_status"] = "UP_TO_DATE" s["patch_detail"] = "Deja a jour" # Vérifier reboot nécessaire (RHEL: needs-restarting, Debian: /var/run/reboot-required) reboot_out, _ = run_cmd(client, "if command -v needs-restarting &>/dev/null; then " "needs-restarting -r 2>/dev/null; echo RC:$?; " "elif [ -f /var/run/reboot-required ]; then echo RC:1; " "else echo RC:0; fi", 15) if "RC:1" in reboot_out: s["reboot_required"] = True self._log_patch(f" ⚠ REBOOT REQUIS", "warn") else: s["reboot_required"] = False s["status"] = "OK" s["duration"] = f"{dur}s" # Notification Teams : fin d'intervention (apres patching) if not s.get("_teams_fin_sent"): self._send_teams_notification(s, "fin") s["_teams_fin_sent"] = True client.close() self.results.append(deepcopy(s)) self._update_tree_status(s) self._update_progress(idx+1, total) # Fin self.running = False self.root.after(0, lambda: [ self.go_btn.configure(state="normal"), self.stop_btn.configure(state="disabled"), self._patch_done_summary() ]) def _update_tree_status(self, s): def _do(): if not self.srv_tree.exists(s["server"]): return vals = list(self.srv_tree.item(s["server"], "values")) vals[9] = s.get("status", "—") # statut est maintenant col 9 tag = ("ok" if s.get("status")=="OK" and s.get("patch_status") not in ("KO",) else "ko" if s.get("status")=="AUTH_KO" else "normal") self.srv_tree.item(s["server"], values=vals, tags=(tag,)) self.root.after(0, _do) def _update_progress(self, done, total): pct = (done/total)*100 self.root.after(0, lambda: [ self.prog_var.set(pct), self.prog_lbl.configure(text=f"{done}/{total} ({pct:.0f}%)")]) def _patch_done_summary(self): ok_c = sum(1 for r in self.results if r.get("status")=="OK") p_c = sum(1 for r in self.results if r.get("patch_status")=="PATCHED") rb_c = sum(1 for r in self.results if r.get("reboot_required")) ko_c = sum(1 for r in self.results if r.get("status")!="OK") msg = (f"Patch termine !\n\n" f"Total traite : {len(self.results)}\n" f"OK : {ok_c}\n" f"Patche : {p_c}\n" f"Reboot requis: {rb_c}\n" f"KO/Auth : {ko_c}") messagebox.showinfo("Patch termine", msg) if rb_c > 0: messagebox.showwarning("Reboot requis", f"{rb_c} serveur(s) necessitent un reboot.\n" "Utiliser l'onglet Post-patching > Reboot.") # Auto-générer le rapport dans l'onglet 4 self.root.after(500, self._generate_report) # Basculer sur l'onglet rapport self.root.after(600, lambda: self.nb.select(self.tab_report)) # ========================================================================== # ONGLET 3 — POST-PATCHING # ========================================================================== def _post_patch_check(self): targets = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"] if not targets: messagebox.showwarning("Attention", "Aucun serveur sélectionné.") return threading.Thread(target=self._post_worker, args=(targets,), daemon=True).start() def _post_worker(self, servers): for s in servers: self._log_post(f"\n── {s['server']} ──", "info") client, eff = ssh_connect(s["server"], s["env"], self.settings, self.pkey, self.pkey2, self.cyb_pwd) if not client: self._log_post(f" Connexion impossible", "ko") continue # Verifier que le pre-patch a ete execute aujourd'hui check, _ = run_cmd(client, "find /tmp -name 'secops_services_avant_*' -mtime 0 2>/dev/null | head -1", 10) if not check or not check.strip(): self._log_post(f" BLOQUE : pas de pre-patch aujourd'hui sur ce serveur", "ko") self._log_post(f" Le fichier /tmp/secops_services_avant_* est absent ou trop ancien", "ko") self._log_post(f" Lancez d'abord un patching (le pre-patch s'execute automatiquement)", "warn") client.close() continue self._log_post(f" Pre-patch trouve : {check.strip()}", "ok") self.audit("POST_CHECK", f"{s['server']} | verification post-patching") out, _ = run_cmd(client, POST_PATCH_SCRIPT, 60) for line in out.splitlines(): if "ALERTE" in line or "disparu" in line.lower(): self._log_post(f" {line}", "ko") elif "WARN" in line or "nouveau" in line.lower(): self._log_post(f" {line}", "warn") elif "OK" in line: self._log_post(f" {line}", "ok") else: self._log_post(f" {line}", "info") # Flux Libre Podman — verifier et redemarrer les pods (pas les BST) is_fl = "flux libre" in s.get("domain", "").lower() and _is_fl_podman(s["server"]) if is_fl: self._log_post(f" Flux Libre : verification pods Podman...", "info") fl_out, _ = run_cmd(client, FL_POST_PATCH_SCRIPT, 60) for line in fl_out.splitlines(): if "FL_RESTART=" in line: pod = line.split("=", 1)[1] self._log_post(f" Redemarrage pod: {pod}", "warn") elif "FL_ALL_OK" in line: self._log_post(f" Tous les pods sont actifs", "ok") elif "FL_RESTARTED=" in line: n = line.split("=", 1)[1] self._log_post(f" {n} pod(s) redemarres", "warn") elif "FL_USER=" in line: self._log_post(f" {line}", "info") elif "FL_NO_PODS" in line: self._log_post(f" Pas de pods Podman", "info") # Notification Teams : fin d'intervention (une seule fois par serveur) if not s.get("_teams_fin_sent"): self._send_teams_notification(s, "fin") s["_teams_fin_sent"] = True client.close() def _ask_mark_patched(self): """Demande confirmation puis marque les serveurs patchés en vert dans Excel""" patched = [s for s in self.servers if s.get("patch_status") == "PATCHED" and s.get("selected")] if not patched: patched = [r for r in self.results if r.get("patch_status") == "PATCHED"] if not patched: messagebox.showwarning("Attention", "Aucun serveur patche detecte.\n" "Selectionner manuellement les serveurs a marquer.") patched = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"] if not patched: return names = "\n".join(s["server"] for s in patched[:10]) if len(patched) > 10: names += f"\n... et {len(patched)-10} autres" if messagebox.askyesno( "Marquer comme patches dans Excel", f"Marquer {len(patched)} serveur(s) en VERT dans le fichier Excel ?\n\n{names}\n\nCette action modifie le fichier Excel original.", icon="question"): self._mark_patched_excel(patched) def _delete_old_snapshots(self): """Supprime les snapshots SLPM du patcheur courant > 3 jours""" if not VSPHERE_OK: messagebox.showwarning("pyVmomi manquant", "pyVmomi non installe.\npy -m pip install pyVmomi --proxy http://proxy.sanef.fr:8080") return selected = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"] if not selected: messagebox.showwarning("Attention", "Aucun serveur selectionne.") return snap_id = self._get_snap_identifier() if not messagebox.askyesno("Supprimer snapshots", f"Supprimer vos snapshots SLPM de plus de 3 jours ?\n\n" f"Patcheur : {snap_id}\n" f"Serveurs : {len(selected)}\n\n" f"Seuls les snapshots SLPM_{snap_id}_* seront supprimes.\n" f"Les snapshots des autres patcheurs et les snapshots manuels ne seront pas touches."): return creds = self._ensure_vcenter_pwd() if not creds: self._log_post("Suppression annulee (pas de credentials vCenter)", "warn") return vs_user, vs_pwd = creds self._log_post(f"Suppression snapshots SLPM_{snap_id}_* > 3 jours...", "info") threading.Thread(target=self._snap_delete_worker, args=(selected, vs_user, vs_pwd, 3), daemon=True).start() def _snap_delete_worker(self, servers, vs_user, vs_pwd, max_days): """Supprime les snapshots SLPM du patcheur courant vieux de PLUS de max_days jours""" snap_id = self._get_snap_identifier() from datetime import timezone cutoff = datetime.now(tz=timezone.utc) - __import__("datetime").timedelta(days=max_days) connected_vcs = {} for vc in VSPHERE_HOSTS: try: import ssl ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE si = SmartConnect(host=vc, user=vs_user, pwd=vs_pwd, sslContext=ctx, connectionPoolTimeout=10) vc_content = si.RetrieveContent() self._log_post(f"Connecte vCenter : {vc}", "ok") connected_vcs[vc] = (si, vc_content) except Exception as e: self._log_post(f"Inaccessible : {vc} — {str(e)[:50]}", "ko") if not connected_vcs: self._log_post("Aucun vCenter joignable", "ko") return for s in servers: vc_order = get_vcenter_order_for_server(s["server"]) for vc in vc_order: if vc not in connected_vcs: continue si, vc_content = connected_vcs[vc] vm = self._find_vm(vc_content, s["server"]) if not vm: continue self._log_post(f"\n VM : {vm.name} sur {vc}", "info") def get_snapshots(snap_list): snaps = [] for snap in snap_list: snaps.append(snap) snaps.extend(get_snapshots(snap.childSnapshotList)) return snaps if not vm.snapshot: self._log_post(f" Aucun snapshot", "info") continue all_snaps = get_snapshots(vm.snapshot.rootSnapshotList) for snap in all_snaps: # Seuls les snapshots SLPM du patcheur courant if not snap.name.startswith("SLPM_"): self._log_post(f" {snap.name} — ignore (pas SLPM)", "info") continue if snap_id.lower() not in snap.name.lower(): self._log_post(f" {snap.name} — ignore (autre patcheur)", "info") continue snap_time = snap.createTime if snap_time.tzinfo is None: snap_time = snap_time.replace(tzinfo=timezone.utc) age_days = (datetime.now(tz=timezone.utc) - snap_time).days if age_days > max_days: self._log_post( f" {snap.name} ({age_days}j) — SUPPRESSION (>{max_days}j)", "warn") try: task = snap.snapshot.RemoveSnapshot_Task(removeChildren=False) while task.info.state not in ("success","error"): time.sleep(1) if task.info.state == "success": self._log_post(f" OK supprime : {snap.name}", "ok") self.audit("SNAP_DELETE", f"{s['server']} | {snap.name} | {age_days}j") else: self._log_post(f" ERREUR suppression : {snap.name}", "ko") except Exception as e: self._log_post(f" ERREUR : {e}", "ko") else: # Récent (≤ max_days jours) → conserver self._log_post( f" Snapshot : {snap.name} ({age_days}j) — conservé (<= {max_days}j)", "info") break for vc, (si, _) in connected_vcs.items(): try: Disconnect(si) except: pass def _remove_old_kernels(self): targets = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"] if not targets: messagebox.showwarning("Attention", "Aucun serveur selectionne.") return if not messagebox.askyesno("Confirmation", f"Supprimer les anciens kernels sur {len(targets)} serveur(s) ?\n\n" f"Garde uniquement le kernel actif + le dernier installe."): return def worker(): for s in targets: self._log_post(f"\n{'='*50}", "info") self._log_post(f"{s['server']} — Suppression anciens kernels", "info") self._log_post(f"{'='*50}", "info") client, _ = ssh_connect(s["server"], s["env"], self.settings, self.pkey, self.pkey2, self.cyb_pwd) if not client: self._log_post(f" Connexion impossible", "ko") continue # 1. Lister les kernels installes self._log_post(f" Kernels installes :", "info") out, _ = run_cmd(client, "rpm -q kernel kernel-core 2>/dev/null | sort", 15) if out: for line in out.splitlines(): self._log_post(f" {line}", "info") else: self._log_post(f" Aucun kernel trouve", "warn") # 2. Kernel actif active, _ = run_cmd(client, "uname -r", 10) self._log_post(f" Kernel actif : {active}", "ok") # 3. Compter les kernels count, _ = run_cmd(client, "rpm -q kernel kernel-core 2>/dev/null | wc -l", 10) nb = int(count.strip()) if count.strip().isdigit() else 0 self._log_post(f" Nombre de kernels : {nb}", "info") if nb <= 2: self._log_post(f" Rien a supprimer (2 kernels max)", "ok") client.close() continue # 4. Supprimer les anciens (garder 2 : actif + dernier) self._log_post(f" Suppression (garde 2)...", "warn") # RHEL 8+ : dnf cmd = ( "if command -v dnf &>/dev/null; then " " sudo dnf remove --oldinstallonly --setopt installonly_limit=2 -y 2>&1; " "else " " sudo package-cleanup --oldkernels --count=2 -y 2>&1; " "fi" ) out, _ = run_cmd(client, cmd, 120) if out: for line in out.splitlines()[-10:]: if "removed" in line.lower() or "erasing" in line.lower(): self._log_post(f" {line}", "ok") elif "error" in line.lower(): self._log_post(f" {line}", "ko") elif "nothing" in line.lower() or "no packages" in line.lower(): self._log_post(f" {line}", "info") else: self._log_post(f" {line}", "info") # 5. Verification after, _ = run_cmd(client, "rpm -q kernel kernel-core 2>/dev/null | sort", 10) self._log_post(f" Kernels restants :", "info") if after: for line in after.splitlines(): self._log_post(f" {line}", "ok") client.close() threading.Thread(target=worker, daemon=True).start() def _undo_yum(self): target = None for s in self.servers: if s.get("selected") and s["accord"]=="oui": target = s break if not target: messagebox.showwarning("Attention", "Sélectionner au moins un serveur.") return self._log_post(f"\n── {target['server']} — Historique yum ──", "info") def worker(): client, _ = ssh_connect(target["server"], target["env"], self.settings, self.pkey, self.pkey2, self.cyb_pwd) if not client: self._log_post(" ❌ Connexion impossible", "ko") return out, _ = run_cmd(client, "sudo yum history list 2>/dev/null | head -20", 20) client.close() self._log_post(out, "info") # Demander l'ID à annuler def ask(): tid = simpledialog.askstring("Undo yum", f"Historique yum sur {target['server']} :\n\n{out}\n\nID de transaction à annuler :") if tid: threading.Thread(target=do_undo, args=(tid,), daemon=True).start() def do_undo(tid): self.audit("UNDO_YUM", f"{target['server']} | transaction {tid}") self._log_post(f" Undo transaction {tid}...", "warn") c2, _ = ssh_connect(target["server"], target["env"], self.settings, self.pkey, self.pkey2, self.cyb_pwd) if not c2: self._log_post(" ❌ Connexion impossible", "ko") return out2, _ = run_cmd(c2, f"sudo yum history undo {tid} -y 2>&1 | tail -15", 120) for line in out2.splitlines(): self._log_post(f" {line}", "ok" if "Complete" in line else "info") c2.close() self.root.after(0, ask) threading.Thread(target=worker, daemon=True).start() def _reboot_servers(self): # audit moved to per-server reboot reboot_list = [s for s in self.servers if s.get("selected") and s.get("reboot_required") and s["accord"]=="oui"] if not reboot_list: reboot_list = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"] if not reboot_list: messagebox.showwarning("Attention", "Aucun serveur sélectionné.") return names = "\n".join(s["server"] for s in reboot_list[:10]) if not messagebox.askyesno("⚠️ Confirmation reboot", f"Redémarrer {len(reboot_list)} serveur(s) ?\n\n{names}", icon="warning"): return def worker(): for s in reboot_list: self._log_post(f"\n── {s['server']} — Reboot ──", "warn") client, _ = ssh_connect(s["server"], s["env"], self.settings, self.pkey, self.pkey2, self.cyb_pwd) if not client: self._log_post(" ❌ Connexion impossible", "ko"); continue # Notification Teams AVANT le reboot (une seule fois par serveur) if not s.get("_teams_reboot_sent"): self._send_teams_notification(s, "reboot") s["_teams_reboot_sent"] = True self.audit("REBOOT", f"{s['server']} | reboot dans 15s") self._log_post(f" Notification reboot envoyee", "info") # Reboot dans 15 secondes (laisse le temps a la synchro SharePoint) run_cmd(client, "sudo nohup bash -c 'sleep 15 && shutdown -r now' &>/dev/null &", 5) client.close() self._log_post(f" Reboot lance (dans 15s)", "ok") threading.Thread(target=worker, daemon=True).start() # ========================================================================== # ONGLET 4 — RAPPORT # ========================================================================== def _generate_report(self): """Génère le rapport visuel — KPI widgets + tableau détaillé + log brut""" if not self.results: messagebox.showwarning("Attention", "Aucun résultat de patch disponible.") return now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") week = date.today().isocalendar()[1] # ── Calcul des KPI ──────────────────────────────────────────────────── total = len(self.results) ok_c = sum(1 for r in self.results if r.get("status")=="OK") p_c = sum(1 for r in self.results if r.get("patch_status")=="PATCHED") uptodate = sum(1 for r in self.results if r.get("patch_status") in ("UP_TO_DATE","UP_TO_DATE")) rb_c = sum(1 for r in self.results if r.get("reboot_required")) ko_c = sum(1 for r in self.results if r.get("status")!="OK") # Mettre à jour les widgets KPI self.kpi_widgets["total" ].configure(text=str(total)) self.kpi_widgets["ok" ].configure(text=str(ok_c)) self.kpi_widgets["patched" ].configure(text=str(p_c)) self.kpi_widgets["uptodate"].configure(text=str(uptodate)) self.kpi_widgets["reboot" ].configure(text=str(rb_c)) self.kpi_widgets["ko" ].configure(text=str(ko_c)) # ── Peupler le tableau détaillé ─────────────────────────────────────── for item in self.report_tree.get_children(): self.report_tree.delete(item) for r in self.results: accord = "OUI" if r.get("accord")=="oui" else "NON" snap = "OK" if r.get("snap_done") else ("PHY" if r.get("is_physical") else "—") conn = r.get("status","?") ps = r.get("patch_status","?") rb = "OUI" if r.get("reboot_required") else "—" detail = r.get("patch_detail","")[:60] # Choisir le tag couleur if r.get("status") != "OK": tag = "ko" elif r.get("patch_status") == "PATCHED": tag = "patched" elif r.get("reboot_required"): tag = "reboot" elif r.get("patch_status") in ("UP_TO_DATE",): tag = "uptodate" else: tag = "ok" self.report_tree.insert("", "end", values=( r.get("server",""), r.get("env",""), r.get("domain",""), accord, snap, conn, ps, rb, detail, ), tags=(tag,)) # ── Log brut ────────────────────────────────────────────────────────── self.report_txt.configure(state="normal") self.report_txt.delete("1.0", "end") def w(txt, tag="info"): self.report_txt.insert("end", txt+"\n", tag) w(f"{'='*70}", "title") w(f" RAPPORT PATCHING — Semaine {week} — {now}", "title") w(f" Fichier : {self.excel_var.get()}", "info") w(f" Intervenant : {self.settings.get('patcher','—')}", "info") w(f"{'='*70}", "title") w("") w(f" RESUME : {total} traites | {ok_c} OK | {p_c} patches | {rb_c} reboot | {ko_c} KO", "info") w("") w(f" {'SERVEUR':<35} {'STATUT':<12} {'PATCH':<16} {'REBOOT'}", "title") w(f" {'-'*66}", "info") for r in self.results: status = r.get("status","?") ps = r.get("patch_status","?") rb = "REBOOT" if r.get("reboot_required") else "" tag = "ok" if status=="OK" else "ko" if r.get("patch_status") == "PATCHED": tag = "ok" if r.get("reboot_required"): tag = "warn" w(f" {r.get('server',''):<35} {status:<12} {ps:<16} {rb}", tag) if r.get("patch_detail"): w(f" → {r['patch_detail'][:80]}", "info") w("") w(f"{'='*70}", "title") self.report_txt.configure(state="disabled") def _mark_patched_excel(self, servers_to_mark): """Met le fond vert sur les lignes patchees dans le fichier Excel. Utilise une copie temporaire pour eviter les conflits OneDrive/verrou.""" import shutil import tempfile filepath = self.excel_var.get().strip() sheet = self.sheet_var.get() if not filepath or not os.path.exists(filepath): messagebox.showerror("Erreur Excel", "Fichier Excel introuvable") return try: from openpyxl import load_workbook from openpyxl.styles import PatternFill # Copie temporaire pour eviter le verrou OneDrive tmp_dir = tempfile.mkdtemp() tmp_path = os.path.join(tmp_dir, os.path.basename(filepath)) shutil.copy2(filepath, tmp_path) wb = load_workbook(tmp_path) ws = wb[sheet] green_fill = PatternFill(start_color="00B050", end_color="00B050", fill_type="solid") marked = 0 for s in servers_to_mark: row_num = s.get("excel_row") if not row_num: for row in ws.iter_rows(): if row[0].value == s["server"]: row_num = row[0].row break if row_num: for col in range(1, ws.max_column + 1): ws.cell(row=row_num, column=col).fill = green_fill marked += 1 # Sauvegarder dans le temp wb.save(tmp_path) wb.close() # Tenter de recopier vers l'original try: shutil.copy2(tmp_path, filepath) self._log_patch(f" {marked} ligne(s) marquee(s) en vert dans Excel", "ok") messagebox.showinfo("Excel mis a jour", f"{marked} serveur(s) marques comme patches (fond vert) dans :\n{filepath}") except PermissionError: # OneDrive verrouille le fichier — sauvegarder a cote backup_path = filepath.replace(".xlsx", f"_patched_{datetime.now():%Y%m%d_%H%M}.xlsx") shutil.copy2(tmp_path, backup_path) self._log_patch(f" Excel verrouille, sauvegarde dans : {backup_path}", "warn") messagebox.showwarning("Excel verrouille", f"Le fichier Excel est verrouille (OneDrive).\n\n" f"Une copie a ete sauvegardee :\n{backup_path}\n\n" f"Fermez le fichier original puis copiez la version patchee.") # Nettoyage temp try: os.remove(tmp_path) os.rmdir(tmp_dir) except Exception: pass self.audit("MARK_EXCEL", f"{marked} serveurs marques, sheet={sheet}") except Exception as e: messagebox.showerror("Erreur Excel", f"Impossible de modifier l'Excel :\n{e}") def _export_csv(self): if not self.results: messagebox.showwarning("Attention","Aucun résultat.") return rep_dir = os.path.join(os.path.expanduser("~"),"Documents","patch_reports") os.makedirs(rep_dir, exist_ok=True) csv_file = os.path.join(rep_dir, f"patch_S{date.today().isocalendar()[1]}_{datetime.now():%Y%m%d_%H%M%S}.csv") fields = ["server","env","domain","app","accord","status","patch_status", "patch_detail","reboot_required","snap_done","is_physical"] with open(csv_file,"w",newline="",encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore") w.writeheader(); w.writerows(self.results) messagebox.showinfo("Export CSV", f"Fichier : {csv_file}") def _export_dokuwiki(self): if not self.results: messagebox.showwarning("Attention","Aucun résultat.") return rep_dir = os.path.join(os.path.expanduser("~"),"Documents","patch_reports") os.makedirs(rep_dir, exist_ok=True) wiki_file = os.path.join(rep_dir, f"patch_wiki_S{date.today().isocalendar()[1]}_{datetime.now():%Y%m%d}.txt") week = date.today().isocalendar()[1] with open(wiki_file,"w",encoding="utf-8") as f: f.write(f"====== Rapport Patching — Semaine {week} ======\n\n") f.write(f"//Généré le {datetime.now():%d/%m/%Y %H:%M} — MYPCZEN / SANEF DSI-SOC//\n\n") f.write("===== Résumé =====\n\n") f.write(f"^ Serveurs traités ^ Patchés ^ Reboot requis ^ KO ^\n") ok_c = sum(1 for r in self.results if r.get("status")=="OK") p_c = sum(1 for r in self.results if r.get("patch_status")=="PATCHED") rb_c = sum(1 for r in self.results if r.get("reboot_required")) ko_c = sum(1 for r in self.results if r.get("status")!="OK") f.write(f"| {len(self.results)} | {p_c} | {rb_c} | {ko_c} |\n\n") f.write("===== Détail =====\n\n") f.write("^ Serveur ^ Env ^ Statut ^ Patch ^ Reboot ^ Accord ^ Snapshot ^\n") for r in self.results: rb = "⚠ OUI" if r.get("reboot_required") else "NON" f.write(f"| {r['server']} | {r['env']} | {r.get('status','?')} | " f"{r.get('patch_status','?')} | {rb} | " f"{'OUI' if r.get('accord')=='oui' else 'NON'} | " f"{'OUI' if r.get('snap_done') else 'NON'} |\n") messagebox.showinfo("Export DokuWiki", f"Fichier : {wiki_file}") # ========================================================================== # ONGLET 5 — AUDIT # ========================================================================== def _build_tab_audit(self): tab = self.tab_audit BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN hf = tk.Frame(tab, bg="#181825", pady=6) hf.pack(fill="x") tk.Label(hf, text="Journal d'audit", bg="#181825", fg=ACCENT, font=("Consolas", 13, "bold")).pack(side="left", padx=12) tk.Button(hf, text="Rafraichir", bg=BTN, fg=FG, font=("Consolas", 9), command=self._refresh_audit).pack(side="right", padx=12) tk.Button(hf, text="Exporter CSV", bg=BTN, fg=FG, font=("Consolas", 9), command=self._export_audit_csv).pack(side="right", padx=4) tk.Button(hf, text="Changer mon mot de passe", bg="#fab387", fg="#1e1e2e", font=("Consolas", 9), command=self._change_my_password).pack(side="right", padx=4) cols = ("timestamp", "user", "action", "details") self.audit_tree = ttk.Treeview(tab, columns=cols, show="headings") self.audit_tree.heading("timestamp", text="Date/Heure") self.audit_tree.heading("user", text="Utilisateur") self.audit_tree.heading("action", text="Action") self.audit_tree.heading("details", text="Details") self.audit_tree.column("timestamp", width=160) self.audit_tree.column("user", width=120) self.audit_tree.column("action", width=150) self.audit_tree.column("details", width=500) sb = ttk.Scrollbar(tab, orient="vertical", command=self.audit_tree.yview) self.audit_tree.configure(yscrollcommand=sb.set) self.audit_tree.pack(side="left", fill="both", expand=True, padx=6, pady=4) sb.pack(side="right", fill="y", pady=4) self._refresh_audit() def _refresh_audit(self): self.audit_tree.delete(*self.audit_tree.get_children()) for log in self.db.get_logs(500): self.audit_tree.insert("", "end", values=( log["timestamp"], log["username"], log["action"], log.get("details", ""))) def _export_audit_csv(self): logs = self.db.get_logs(5000) if not logs: messagebox.showinfo("Info", "Aucun log"); return p = filedialog.asksaveasfilename(defaultextension=".csv", filetypes=[("CSV", "*.csv")], initialfile=f"audit_patch_{datetime.now():%Y%m%d_%H%M%S}.csv") if not p: return with open(p, "w", newline="", encoding="utf-8-sig") as f: w = csv.writer(f, delimiter=";") w.writerow(["Timestamp", "Utilisateur", "Action", "Details"]) for l in logs: w.writerow([l["timestamp"], l["username"], l["action"], l.get("details", "")]) self.audit("EXPORT_AUDIT", f"{len(logs)} entries -> {p}") def _change_my_password(self): dlg = ChangePasswordDialog(self.root, self.db, self.current_user["username"]) # Centrer sur le parent if dlg.success: messagebox.showinfo("OK", "Mot de passe modifie avec succes") # ========================================================================== # ONGLET 6 — UTILISATEURS (admin only) # ========================================================================== def _build_tab_users(self): tab = self.tab_users BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN # Creation user cf = tk.Frame(tab, bg="#181825", pady=6) cf.pack(fill="x") tk.Label(cf, text="Gestion des utilisateurs", bg="#181825", fg=ACCENT, font=("Consolas", 13, "bold")).pack(side="left", padx=12) af = tk.Frame(tab, bg=BG, pady=6) af.pack(fill="x", padx=12) tk.Label(af, text="Utilisateur :", bg=BG, fg=FG, font=("Consolas", 10)).pack(side="left", padx=(0, 4)) self.new_user_var = tk.StringVar() tk.Entry(af, textvariable=self.new_user_var, bg=BG2, fg=FG, font=("Consolas", 10), width=15, insertbackground=FG).pack(side="left", padx=4) tk.Label(af, text="MDP :", bg=BG, fg=FG, font=("Consolas", 10)).pack(side="left", padx=(8, 4)) self.new_pwd_var = tk.StringVar() tk.Entry(af, textvariable=self.new_pwd_var, bg=BG2, fg=FG, font=("Consolas", 10), width=15, show="*", insertbackground=FG).pack(side="left", padx=4) tk.Label(af, text="Role :", bg=BG, fg=FG, font=("Consolas", 10)).pack(side="left", padx=(8, 4)) self.new_role_var = tk.StringVar(value="operator") tk.OptionMenu(af, self.new_role_var, "admin", "operator", "viewer").pack(side="left", padx=4) tk.Button(af, text="Creer", bg="#40a02b", fg="white", font=("Consolas", 9, "bold"), command=self._create_user).pack(side="left", padx=8) # Liste users cols = ("username", "role", "locked", "must_change", "last_login") self.users_tree = ttk.Treeview(tab, columns=cols, show="headings") self.users_tree.heading("username", text="Utilisateur") self.users_tree.heading("role", text="Role") self.users_tree.heading("locked", text="Verrouille") self.users_tree.heading("must_change", text="Chg. MDP") self.users_tree.heading("last_login", text="Derniere connexion") self.users_tree.column("username", width=150) self.users_tree.column("role", width=100) self.users_tree.column("locked", width=100) self.users_tree.column("must_change", width=100) self.users_tree.column("last_login", width=180) self.users_tree.pack(fill="both", expand=True, padx=12, pady=6) bf = tk.Frame(tab, bg=BG, pady=6) bf.pack(fill="x", padx=12) tk.Button(bf, text="Supprimer", bg="#f38ba8", fg="#1e1e2e", font=("Consolas", 9, "bold"), command=self._delete_user).pack(side="left", padx=4) tk.Button(bf, text="Deverrouiller", bg="#fab387", fg="#1e1e2e", font=("Consolas", 9, "bold"), command=self._unlock_user).pack(side="left", padx=4) tk.Button(bf, text="Reset MDP", bg=BTN, fg=FG, font=("Consolas", 9), command=self._reset_user_pwd).pack(side="left", padx=4) tk.Button(bf, text="Rafraichir", bg=BTN, fg=FG, font=("Consolas", 9), command=self._refresh_users).pack(side="right", padx=4) self._refresh_users() def _refresh_users(self): self.users_tree.delete(*self.users_tree.get_children()) for u in self.db.list_users(): self.users_tree.insert("", "end", values=( u["username"], u["role"], "OUI" if u["locked"] else "", "OUI" if u["must_change_pwd"] else "", u["last_login"] or "")) def _create_user(self): un = self.new_user_var.get().strip() pw = self.new_pwd_var.get().strip() role = self.new_role_var.get() if not un or not pw: messagebox.showwarning("Erreur", "Saisir un nom et un mot de passe"); return if self.db.create_user(un, pw, role): self.audit("CREATE_USER", f"{un} (role={role})") self.new_user_var.set(""); self.new_pwd_var.set("") self._refresh_users() else: messagebox.showerror("Erreur", f"L'utilisateur '{un}' existe deja") def _delete_user(self): sel = self.users_tree.selection() if not sel: return un = self.users_tree.item(sel[0])["values"][0] if un == "admin": messagebox.showwarning("Erreur", "Impossible de supprimer admin"); return if messagebox.askyesno("Confirmer", f"Supprimer '{un}' ?"): self.db.delete_user(un) self.audit("DELETE_USER", un) self._refresh_users() def _unlock_user(self): sel = self.users_tree.selection() if not sel: return un = self.users_tree.item(sel[0])["values"][0] self.db.unlock_user(un) self.audit("UNLOCK_USER", un) self._refresh_users() def _reset_user_pwd(self): sel = self.users_tree.selection() if not sel: return un = self.users_tree.item(sel[0])["values"][0] new_pwd = simpledialog.askstring("Reset", f"Nouveau mot de passe pour {un} :", show="*") if new_pwd: self.db.reset_password(un, new_pwd) self.audit("RESET_PASSWORD", un) self._refresh_users() # ============================================================================== # MAIN # ============================================================================== if __name__ == "__main__": db = Database() root = tk.Tk() root.withdraw() # Cacher pendant le login # Première initialisation : afficher le mot de passe admin généré if hasattr(db, "_init_password"): messagebox.showinfo( "Premier lancement", f"Base de données initialisée.\n\n" f"Compte admin créé avec le mot de passe :\n\n" f" {db._init_password}\n\n" f"⚠ NOTEZ-LE MAINTENANT — il ne sera plus affiché.\n" f"Vous devrez le changer à la première connexion.", parent=root) del db._init_password login = LoginDialog(root, db) if not login.result: root.destroy() sys.exit(0) user = login.result # Forcer changement de mot de passe au premier login if user.get("must_change_pwd"): dlg = ChangePasswordDialog(root, db, user["username"], forced=True) if not dlg.success: root.destroy() sys.exit(0) root.deiconify() app = PatchManagerV2(root, db, user) root.mainloop()