#!/usr/bin/env python3 # ============================================================================== # patch_manager_v2.py — SANEF Linux Patch Manager GUI v2 # Gestion complète du patching Linux via Excel + SSH + vSphere # # Prérequis : py -m pip install paramiko openpyxl requests pyVmomi --proxy http://proxy.sanef.fr:8080 # Usage : py patch_manager_v2.py # Auteur : MYPCZEN / SANEF DSI - SOC # ============================================================================== import tkinter as tk from tkinter import ttk, filedialog, scrolledtext, messagebox, simpledialog import threading import paramiko import socket import time import csv import os import sys import json from datetime import datetime, date from copy import deepcopy import sqlite3 import hashlib import secrets import ctypes # openpyxl optionnel — installé si absent try: import openpyxl except ImportError: os.system("py -m pip install openpyxl --proxy http://proxy.sanef.fr:8080 -q") import openpyxl # pyVmomi optionnel — pour vSphere try: from pyVim.connect import SmartConnect, Disconnect from pyVmomi import vim VSPHERE_OK = True except ImportError: VSPHERE_OK = False # ============================================================================== # CONSTANTES # ============================================================================== VERSION = "2.0" VSPHERE_HOSTS = [ "vpgesavcs1.sanef.groupe", "vpmetavcs1.sanef.groupe", "vpsicavcs1.sanef.groupe", ] YUM_EXCLUDES_STD = ( "--exclude=*mongodb* --exclude=*mysql* --exclude=*postgres* " "--exclude=*mariadb* --exclude=*oracle* --exclude=*pgdg* --exclude=*php* " "--exclude=*java* --exclude=*redis* --exclude=*elasticsearch* --exclude=*nginx* " "--exclude=*mod_ssl* --exclude=*haproxy* --exclude=*certbot* " "--exclude=*python-certbot* --exclude=*docker* --exclude=*podman* " "--exclude=*centreon* --exclude=*qwserver* " "--exclude=*ansible* --exclude=*node* --exclude=*tina* --exclude=*memcached* " "--exclude=*nextcloud* --exclude=*pgbouncer* --exclude=*pgpool* " "--exclude=*pgbadger* --exclude=*psycopg2* --exclude=*barman* --exclude=*kibana* " "--exclude=*sdcss*" # Symantec DCS agent + sdcss-kmod kernel module ) YUM_EXCLUDES_FLUX_LIBRE = "--exclude=*podman*" PRE_PATCH_SCRIPT = r""" cat > /tmp/secops_pre_patching.sh << 'EOF' HOSTNAME=$(hostname) SNAPSHOT_DIR="/tmp" systemctl list-units --type=service --state=running --no-pager \ | awk '{print $1}' | grep '\.service$' \ > ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt echo "Services sauvegardes : $(wc -l < ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt)" ss -tlnup > ${SNAPSHOT_DIR}/secops_ports_avant_${HOSTNAME}.txt ss -tlnup | awk 'NR>1 && $7 != "" { match($7, /users:\(\("([^"]+)"/, arr) split($5, addr, ":") port = addr[length(addr)] if (arr[1] != "" && port+0 < 32768) print port, arr[1] }' | sort -u > ${SNAPSHOT_DIR}/secops_ports_detail_avant_${HOSTNAME}.txt echo "Ports sauvegardes : $(grep -c LISTEN ${SNAPSHOT_DIR}/secops_ports_avant_${HOSTNAME}.txt)" echo "PRE_PATCH_OK" EOF bash /tmp/secops_pre_patching.sh """ POST_PATCH_SCRIPT = r""" cat > /tmp/secops_post_patching.sh << 'EOF' HOSTNAME=$(hostname) SNAPSHOT_DIR="/tmp" GREEN='\033[0;32m'; RED='\033[0;31m'; YELLOW='\033[0;33m'; NC='\033[0m' RAPPORT="/tmp/rapport_patching_${HOSTNAME}_$(date +%Y%m%d_%H%M).txt" systemctl list-units --type=service --state=running --no-pager \ | awk '{print $1}' | grep '\.service$' \ > ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt DISPARUS_SVC=$(comm -23 \ <(sort ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt) \ <(sort ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt) \ | grep -v "user@") APPARUS_SVC=$(comm -13 \ <(sort ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt) \ <(sort ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt) \ | grep -v "setroubleshootd\|user@") echo "--- SERVICES ---" | tee -a ${RAPPORT} echo "Avant: $(wc -l < ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt) | Apres: $(wc -l < ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt)" | tee -a ${RAPPORT} [ -z "$DISPARUS_SVC" ] && echo "OK - Aucun service disparu" | tee -a ${RAPPORT} || { echo "ALERTE services disparus:" | tee -a ${RAPPORT}; echo "$DISPARUS_SVC" | tee -a ${RAPPORT}; } [ -z "$APPARUS_SVC" ] && echo "OK - Aucun nouveau service" | tee -a ${RAPPORT} || { echo "WARN nouveaux services:" | tee -a ${RAPPORT}; echo "$APPARUS_SVC" | tee -a ${RAPPORT}; } ss -tlnup > ${SNAPSHOT_DIR}/secops_ports_apres_${HOSTNAME}.txt ss -tlnup | awk 'NR>1 && $7 != "" { match($7, /users:\(\("([^"]+)"/, arr) split($5, addr, ":") port = addr[length(addr)] if (arr[1] != "" && port+0 < 32768) print port, arr[1] }' | sort -u > ${SNAPSHOT_DIR}/secops_ports_detail_apres_${HOSTNAME}.txt PORTS_DISPARUS=$(comm -23 \ <(sort ${SNAPSHOT_DIR}/secops_ports_detail_avant_${HOSTNAME}.txt) \ <(sort ${SNAPSHOT_DIR}/secops_ports_detail_apres_${HOSTNAME}.txt)) PORTS_APPARUS=$(comm -13 \ <(sort ${SNAPSHOT_DIR}/secops_ports_detail_avant_${HOSTNAME}.txt) \ <(sort ${SNAPSHOT_DIR}/secops_ports_detail_apres_${HOSTNAME}.txt)) echo "--- PORTS ---" | tee -a ${RAPPORT} echo "Avant: $(grep -c LISTEN ${SNAPSHOT_DIR}/secops_ports_avant_${HOSTNAME}.txt) | Apres: $(grep -c LISTEN ${SNAPSHOT_DIR}/secops_ports_apres_${HOSTNAME}.txt)" | tee -a ${RAPPORT} [ -z "$PORTS_DISPARUS" ] && echo "OK - Aucun port disparu" | tee -a ${RAPPORT} || { echo "ALERTE ports disparus:" | tee -a ${RAPPORT}; echo "$PORTS_DISPARUS" | tee -a ${RAPPORT}; } [ -z "$PORTS_APPARUS" ] && echo "OK - Aucun nouveau port" | tee -a ${RAPPORT} || { echo "WARN nouveaux ports:" | tee -a ${RAPPORT}; echo "$PORTS_APPARUS" | tee -a ${RAPPORT}; } echo "" | tee -a ${RAPPORT} echo "RAPPORT: ${RAPPORT}" echo "POST_PATCH_OK" EOF bash /tmp/secops_post_patching.sh """ FL_PRE_PATCH_SCRIPT = r""" cat > /tmp/secops_fl_pre_patch.sh << 'FLEOF' # Detecter l'utilisateur applicatif podman APP_USER=$(ps aux | grep -E "conmon|podman" | grep -v grep | awk '{print $1}' | sort -u | head -1) if [ -z "$APP_USER" ]; then echo "FL_NO_PODS" exit 0 fi echo "FL_USER=$APP_USER" # Snapshot pods running via sudo su sudo su - $APP_USER -c 'XDG_RUNTIME_DIR=/run/user/$(id -u) podman ps --format "{{.Names}}" --filter "status=running"' \ > /tmp/fl_pods_avant_$(hostname)_$(date +%Y%m%d_%H%M).txt echo "FL_PODS_SAVED=$(wc -l < /tmp/fl_pods_avant_$(hostname)_*.txt | tail -1)" # Status boo_manage si disponible if command -v boo_manage &>/dev/null || sudo su - $APP_USER -c "which boo_manage" &>/dev/null; then sudo su - $APP_USER -c "boo_manage -a -c status" 2>/dev/null || true fi echo "FL_PRE_OK" FLEOF bash /tmp/secops_fl_pre_patch.sh """ FL_POST_PATCH_SCRIPT = r""" cat > /tmp/secops_fl_post_patch.sh << 'FLEOF' APP_USER=$(ps aux | grep -E "conmon|podman" | grep -v grep | awk '{print $1}' | sort -u | head -1) if [ -z "$APP_USER" ]; then echo "FL_NO_PODS" exit 0 fi echo "FL_USER=$APP_USER" SNAPSHOT=$(ls -t /tmp/fl_pods_avant_$(hostname)_*.txt 2>/dev/null | head -1) if [ -z "$SNAPSHOT" ]; then echo "FL_NO_SNAPSHOT" sudo su - $APP_USER -c "boo_manage -a -c start" 2>/dev/null || true exit 0 fi AVANT=$(cat $SNAPSHOT) APRES=$(sudo su - $APP_USER -c 'XDG_RUNTIME_DIR=/run/user/$(id -u) podman ps --format "{{.Names}}"') ARRETES=0 while IFS= read -r pod; do if ! echo "$APRES" | grep -q "^${pod}$"; then echo "FL_RESTART=$pod" sudo su - $APP_USER -c "boo_manage -p ${pod} -c start" 2>/dev/null || \ sudo su - $APP_USER -c "XDG_RUNTIME_DIR=/run/user/\$(id -u) podman start ${pod}" 2>/dev/null ARRETES=$((ARRETES + 1)) fi done <<< "$AVANT" if [ $ARRETES -eq 0 ]; then echo "FL_ALL_OK" else echo "FL_RESTARTED=$ARRETES" fi sudo su - $APP_USER -c "boo_manage -a -c status" 2>/dev/null || true echo "FL_POST_OK" FLEOF bash /tmp/secops_fl_post_patch.sh """ PRESET_PACKAGES = { "Patch global (avec excludes)": "", "Java": "java-1.8.0-openjdk java-1.8.0-openjdk-headless", "OpenSSL": "openssl", "OpenSSH": "openssh openssh-server openssh-clients", "gnutls + libsoup + rhc": "gnutls libsoup rhc", "glibc": "glibc", "mariadb-libs": "mariadb-libs", "curl": "curl libcurl", "Personnalise...": "CUSTOM", } SETTINGS_FILE = os.path.join(os.path.expanduser("~"), ".patch_manager_settings.json") DB_PATH = os.path.join(os.path.expanduser("~"), ".patch_manager.db") # ============================================================================== # DATABASE — Auth + Audit # ============================================================================== _db_lock = threading.Lock() def _hash_password(password, salt=None): if salt is None: salt = secrets.token_hex(16) h = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 310_000) return f"{salt}${h.hex()}" def _verify_password(password, stored): if "$" not in stored: return False salt, _ = stored.split("$", 1) return _hash_password(password, salt) == stored class Database: def __init__(self): self.conn = sqlite3.connect(DB_PATH, check_same_thread=False) self.conn.row_factory = sqlite3.Row self._init_tables() def _exec(self, sql, params=()): with _db_lock: return self.conn.execute(sql, params) def _commit(self): with _db_lock: self.conn.commit() def _init_tables(self): with _db_lock: self.conn.executescript(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'operator', must_change_pwd INTEGER NOT NULL DEFAULT 1, failed_attempts INTEGER NOT NULL DEFAULT 0, locked INTEGER NOT NULL DEFAULT 0, created_at TEXT DEFAULT (datetime('now','localtime')), last_login TEXT ); CREATE TABLE IF NOT EXISTS audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT DEFAULT (datetime('now','localtime')), username TEXT, action TEXT NOT NULL, details TEXT ); """) # Compte admin par defaut row = self.conn.execute("SELECT id FROM users WHERE username='admin'").fetchone() if not row: h = _hash_password("pzBL4l9p24t*$zbYnxHs1mBvp") self.conn.execute( "INSERT INTO users (username, password_hash, role, must_change_pwd) VALUES (?,?,?,?)", ("admin", h, "admin", 1)) self.conn.commit() def authenticate(self, username, password): row = self._exec("SELECT * FROM users WHERE LOWER(username)=LOWER(?)", (username,)).fetchone() if not row: return None, "Utilisateur inconnu" if row["locked"]: return None, "Compte verrouille (3 tentatives). Contactez l'admin." if not _verify_password(password, row["password_hash"]): attempts = row["failed_attempts"] + 1 if attempts >= 3: self._exec("UPDATE users SET failed_attempts=?, locked=1 WHERE id=?", (attempts, row["id"])) else: self._exec("UPDATE users SET failed_attempts=? WHERE id=?", (attempts, row["id"])) self._commit() remaining = 3 - attempts if remaining <= 0: return None, "Compte verrouille apres 3 echecs." return None, f"Mot de passe incorrect ({remaining} essai(s) restant(s))" # Succes self._exec("UPDATE users SET failed_attempts=0, last_login=datetime('now','localtime') WHERE id=?", (row["id"],)) self._commit() return dict(row), None def change_password(self, username, new_password): h = _hash_password(new_password) self._exec("UPDATE users SET password_hash=?, must_change_pwd=0 WHERE username=?", (h, username)) self._commit() def create_user(self, username, password, role="operator"): try: h = _hash_password(password) self._exec("INSERT INTO users (username, password_hash, role, must_change_pwd) VALUES (?,?,?,1)", (username, h, role)) self._commit() return True except sqlite3.IntegrityError: return False def delete_user(self, username): self._exec("DELETE FROM users WHERE username=? AND username != 'admin'", (username,)) self._commit() def unlock_user(self, username): self._exec("UPDATE users SET locked=0, failed_attempts=0 WHERE username=?", (username,)) self._commit() def reset_password(self, username, new_password): h = _hash_password(new_password) self._exec("UPDATE users SET password_hash=?, must_change_pwd=1, locked=0, failed_attempts=0 WHERE username=?", (h, username)) self._commit() def list_users(self): return [dict(r) for r in self._exec("SELECT * FROM users ORDER BY username").fetchall()] def log_action(self, username, action, details=""): self._exec("INSERT INTO audit_log (username, action, details) VALUES (?,?,?)", (username, action, details)) self._commit() def get_logs(self, limit=500): return [dict(r) for r in self._exec( "SELECT * FROM audit_log ORDER BY id DESC LIMIT ?", (limit,)).fetchall()] # ============================================================================== # SPLUNK HEC — envoi de logs vers Splunk # ============================================================================== def send_to_splunk(settings, event_data): """Envoie un evenement vers Splunk via HEC (HTTP Event Collector). settings doit contenir: splunk_url, splunk_token, splunk_index, splunk_enabled event_data: dict avec les champs a envoyer""" if not settings.get("splunk_enabled") or settings.get("splunk_enabled") == "false": return url = settings.get("splunk_url", "").strip() token = settings.get("_splunk_token_session", settings.get("splunk_token", "")).strip() if not url or not token: return try: import urllib.request import ssl payload = json.dumps({ "index": settings.get("splunk_index", "main"), "sourcetype": "patch_manager", "source": "sanef_patch_manager_v2", "host": socket.gethostname(), "event": event_data, }).encode("utf-8") req = urllib.request.Request( url.rstrip("/") + "/services/collector/event", data=payload, headers={ "Authorization": f"Splunk {token}", "Content-Type": "application/json", }, method="POST") # Proxy SANEF si configure proxy_url = settings.get("proxy_url", "") if proxy_url: handler = urllib.request.ProxyHandler({"https": proxy_url, "http": proxy_url}) opener = urllib.request.build_opener(handler) else: ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_peer = False handler = urllib.request.HTTPSHandler(context=ctx) opener = urllib.request.build_opener(handler) opener.open(req, timeout=5) except Exception: pass # Silencieux — ne jamais bloquer le patching pour un log # ============================================================================== # WINDOW CENTERING — toujours sur le meme ecran # ============================================================================== def center_window(win, width=None, height=None, parent=None): """Centre une fenetre sur le meme ecran que le parent (ou le curseur si pas de parent)""" win.update_idletasks() if width is None: width = win.winfo_width() if height is None: height = win.winfo_height() try: user32 = ctypes.windll.user32 class POINT(ctypes.Structure): _fields_ = [("x", ctypes.c_long), ("y", ctypes.c_long)] class RECT(ctypes.Structure): _fields_ = [("left", ctypes.c_long), ("top", ctypes.c_long), ("right", ctypes.c_long), ("bottom", ctypes.c_long)] class MONITORINFO(ctypes.Structure): _fields_ = [("cbSize", ctypes.c_ulong), ("rcMonitor", RECT), ("rcWork", RECT), ("dwFlags", ctypes.c_ulong)] MONITOR_DEFAULTTONEAREST = 2 if parent is not None: # Centrer sur l'ecran du parent px = parent.winfo_x() + parent.winfo_width() // 2 py = parent.winfo_y() + parent.winfo_height() // 2 pt = POINT(px, py) else: # Centrer sur l'ecran du curseur pt = POINT() user32.GetCursorPos(ctypes.byref(pt)) hmon = user32.MonitorFromPoint(pt, MONITOR_DEFAULTTONEAREST) mi = MONITORINFO() mi.cbSize = ctypes.sizeof(MONITORINFO) user32.GetMonitorInfoW(hmon, ctypes.byref(mi)) work = mi.rcWork sx = work.left + (work.right - work.left - width) // 2 sy = work.top + (work.bottom - work.top - height) // 2 except Exception: if parent is not None: sx = parent.winfo_x() + (parent.winfo_width() - width) // 2 sy = parent.winfo_y() + (parent.winfo_height() - height) // 2 else: sx = (win.winfo_screenwidth() - width) // 2 sy = (win.winfo_screenheight() - height) // 2 win.geometry(f"{width}x{height}+{sx}+{sy}") # ============================================================================== # SETTINGS # ============================================================================== DEFAULT_SETTINGS = { "keyfile": r"C:\scripts\id_rsa_cybglobal.pem", "keyfile2": r"C:\scripts\id_rsa_cybsecope.ppk", "cybr_user": "CYBP01336", "target_user": "cybsecope", "psmp": "psmp.sanef.fr", "timeout": 20, "parallelism": 3, "excel_file": "", "patcher": "", "vs_user": "", "splunk_enabled": "false", "splunk_url": "", "splunk_index": "main", "proxy_url": "http://proxy.sanef.fr:8080", } def load_settings(): if os.path.exists(SETTINGS_FILE): try: with open(SETTINGS_FILE, "r") as f: s = json.load(f) DEFAULT_SETTINGS.update(s) except Exception: pass return dict(DEFAULT_SETTINGS) def save_settings(settings): try: with open(SETTINGS_FILE, "w") as f: json.dump(settings, f, indent=2) except Exception: pass # ============================================================================== # EXCEL READER # ============================================================================== def get_week_sheet(wb): """Trouve la feuille dont le nom correspond à la semaine courante""" week_num = date.today().isocalendar()[1] candidates = [f"S{week_num}", f"S{week_num:02d}", f"Semaine {week_num}", f"W{week_num}", str(week_num)] for name in candidates: if name in wb.sheetnames: return name # Retourner toutes les feuilles pour que l'utilisateur choisisse return None # Couleur vert Excel = patché OK EXCEL_GREEN = "FF00B050" def read_excel_servers(filepath, sheet_name=None): """Lit le fichier Excel et retourne la liste des serveurs Linux + couleurs. Utilise une copie temporaire pour eviter les conflits OneDrive.""" import shutil, tempfile, io tmp_dir = tempfile.mkdtemp() tmp_path = os.path.join(tmp_dir, os.path.basename(filepath)) wb = None # Methode 1: copie fichier try: shutil.copy2(filepath, tmp_path) wb = openpyxl.load_workbook(tmp_path, data_only=True) except Exception: pass # Methode 2: lecture binaire en memoire (bypass lock OneDrive) if wb is None: try: with open(filepath, "rb") as f: data = io.BytesIO(f.read()) wb = openpyxl.load_workbook(data, data_only=True) except Exception: pass # Methode 3: ouverture directe if wb is None: wb = openpyxl.load_workbook(filepath, data_only=True) if sheet_name is None: sheet_name = get_week_sheet(wb) if sheet_name is None: return None, wb.sheetnames # retourner la liste pour que l'user choisisse ws = wb[sheet_name] # Lire les en-têtes (première ligne non vide) headers = {} header_row = None for row_idx, row in enumerate(ws.iter_rows(max_row=5), start=1): vals = [c.value for c in row if c.value is not None] if len(vals) >= 3: headers = {str(c.value).strip().lower(): c.column - 1 for c in row if c.value is not None} header_row = row_idx break if not headers: return [], wb.sheetnames # Mapping champs (tolérance casse/accents) def find_col(keys): for k in keys: for h, idx in headers.items(): if k.lower() in h.lower(): return idx return None col_server = find_col(["asset name", "nom du serveur", "serveur", "hostname", "server", "nom"]) col_os = find_col(["os", "système", "systeme"]) col_env = find_col(["environnement", "environment", "env"]) col_domain = find_col(["domaine", "domain"]) col_app = find_col(["nom complet", "application", "nom_complet"]) col_accord = find_col(["accord"]) col_date = find_col(["date du patch", "date patch", "date heure", "date_heure", "date patching", "date"]) col_patcher = find_col(["intervenant", "patcheur", "technicien"]) servers = [] for row in ws.iter_rows(min_row=header_row + 1): vals = [c.value for c in row] cells = list(row) if not vals or all(v is None for v in vals): continue def get(col): if col is None or col >= len(vals): return "" v = vals[col] return str(v).strip() if v is not None else "" os_val = get(col_os) if "linux" not in os_val.lower(): continue server_name = get(col_server) if not server_name: continue # Parsing date date_val = None if col_date is not None and col_date < len(vals): raw_date = vals[col_date] if isinstance(raw_date, datetime): date_val = raw_date.date() elif isinstance(raw_date, date): date_val = raw_date elif raw_date: try: date_val = datetime.strptime(str(raw_date).strip()[:10], "%Y-%m-%d").date() except Exception: try: date_val = datetime.strptime(str(raw_date).strip()[:10], "%d/%m/%Y").date() except Exception: date_val = None # Accord — règle stricte : # colonne absente OU cellule vide = NON (on ne patche pas sans accord explicite) # Seul "oui" / True dans la cellule = accord confirmé if col_accord is None or col_accord >= len(vals): raw_accord = "non" else: raw_v = vals[col_accord] if raw_v is None or str(raw_v).strip() == "": raw_accord = "non" # vide = pas d'accord elif isinstance(raw_v, bool): raw_accord = "oui" if raw_v else "non" else: v_str = str(raw_v).strip().lower() raw_accord = "oui" if v_str == "oui" else "non" # Détecter si la cellule serveur a un fond vert = déjà patché already_patched = False if col_server is not None and col_server < len(cells): try: fill = cells[col_server].fill bg = fill.fgColor.rgb if fill and fill.fgColor else "" if bg == EXCEL_GREEN or bg.upper() == EXCEL_GREEN: already_patched = True except Exception: pass servers.append({ "server": server_name, "os": os_val, "env": get(col_env), "domain": get(col_domain), "app": get(col_app), "accord": raw_accord if raw_accord else "non", "date_patch": date_val, "patcher": (get(col_patcher) or "").strip(), "selected": False, "snap_done": False, "is_physical": False, "status": "✅ DÉJÀ PATCHÉ" if already_patched else "—", "patch_status": "PATCHED" if already_patched else "—", "patch_detail": "Fond vert Excel" if already_patched else "", "reboot_required": False, "already_patched": already_patched, "excel_row": cells[0].row if cells else None, }) wb.close() # Nettoyage copie temporaire try: os.remove(tmp_path) os.rmdir(tmp_dir) except Exception: pass return servers, None # ============================================================================== # SSH HELPERS # ============================================================================== def build_fqdn(server, env): if "." in server: return [server] env_low = env.lower() if "recette" in env_low or "rec" in env_low: return [server, f"{server}.sanef-rec.fr"] else: return [server, f"{server}.sanef.groupe"] def load_key(keyfile): if not keyfile or not os.path.exists(keyfile.strip('"').strip("'")): return None keyfile = keyfile.strip('"').strip("'") for cls in [paramiko.RSAKey, paramiko.Ed25519Key, paramiko.ECDSAKey]: try: return cls.from_private_key_file(keyfile) except Exception: continue return None def ssh_connect(server, env, settings, pkey, pkey2, cyb_password=None): """Connexion SSH avec fallback FQDN et clé. Retourne (client, hostname_effectif)""" candidates = build_fqdn(server, env) is_prod = "prod" in env.lower() or "production" in env.lower() for hostname in candidates: if is_prod and cyb_password: # CyberArk keyboard-interactive username = f"{settings['cybr_user']}@{settings['target_user']}@{hostname}" password = cyb_password try: def handler(title, instructions, prompt_list): return [password] * len(prompt_list) transport = paramiko.Transport((settings["psmp"], 22)) transport.connect() transport.auth_interactive(username, handler) client = paramiko.SSHClient() client._transport = transport client._eff_host = hostname return client, hostname except Exception: continue else: # SSH direct avec clé for key in [k for k in [pkey, pkey2] if k]: try: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(hostname=hostname, port=22, username=settings["target_user"], pkey=key, timeout=settings["timeout"], look_for_keys=False, allow_agent=False) client._eff_host = hostname return client, hostname except Exception: continue return None, None def run_cmd(client, cmd, timeout=300): try: _, stdout, stderr = client.exec_command(cmd, timeout=timeout) out = stdout.read().decode("utf-8", errors="ignore").strip() err = stderr.read().decode("utf-8", errors="ignore").strip() return out, err except Exception as e: return "", str(e) def detect_physical(client): """Détecte si le serveur est physique ou VM""" out, _ = run_cmd(client, "sudo virt-what 2>/dev/null || echo UNKNOWN", 10) if not out or out == "UNKNOWN": out2, _ = run_cmd(client, "sudo dmidecode -s system-manufacturer 2>/dev/null || echo UNKNOWN", 10) vmware_kw = ["vmware", "virtualbox", "kvm", "xen", "qemu", "microsoft corporation"] if any(k in out2.lower() for k in vmware_kw): return False # VM phys_kw = ["hp", "dell", "lenovo", "ibm", "supermicro", "cisco"] if any(k in out2.lower() for k in phys_kw): return True # Physique if out and out != "UNKNOWN": return False # virt-what a trouvé quelque chose = VM return False # par défaut supposer VM (plus sûr) def build_yum_command(domain, packages, exclude_kernel, dryrun): """Construit la commande yum selon le domaine""" domain_low = domain.lower() is_flux_libre = "flux libre" in domain_low if is_flux_libre: excludes = YUM_EXCLUDES_FLUX_LIBRE else: excludes = YUM_EXCLUDES_STD if exclude_kernel: excludes += " --exclude=*kernel*" if dryrun: if packages: cmd = f"sudo yum check-update {packages} 2>/dev/null | grep -vE '^$|^Load|Updat|kB|bps|00:|^Red|^EPEL|^Sub' | grep -E '^[a-z]' | head -20" else: cmd = f"sudo yum check-update {excludes} 2>/dev/null | grep -vE '^$|^Load|Updat|kB|bps|00:|^Red|^EPEL|^Sub' | grep -E '^[a-z]' | head -30" else: if packages: cmd = f"sudo yum update -y {packages} 2>&1 | tail -25" else: cmd = f"sudo yum update -y {excludes} 2>&1 | tail -25" return cmd # ============================================================================== # LOGIN DIALOG # ============================================================================== class LoginDialog(tk.Toplevel): def __init__(self, parent, db): super().__init__(parent) self.db = db self.result = None # {"username":..., "role":...} ou None self.title("SANEF Patch Manager - Connexion") self.configure(bg="#1e1e2e") self.resizable(False, False) self.protocol("WM_DELETE_WINDOW", self._quit) self._build() center_window(self, 420, 340) self.grab_set() self.bind("", lambda e: self._login()) self.wait_window() def _build(self): BG = "#1e1e2e"; BG2 = "#2a2a3e"; FG = "#cdd6f4"; ACCENT = "#89b4fa" tk.Label(self, text="SANEF PATCH MANAGER", bg="#181825", fg=ACCENT, font=("Consolas", 14, "bold"), pady=10).pack(fill="x") tk.Label(self, text="Authentification requise", bg=BG, fg="#6c7086", font=("Consolas", 10)).pack(pady=(10, 15)) f = tk.Frame(self, bg=BG) f.pack(pady=5) tk.Label(f, text="Utilisateur :", bg=BG, fg=FG, font=("Consolas", 11), width=14, anchor="w").grid(row=0, column=0, padx=8, pady=6) self.user_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 11), insertbackground=FG, width=22) self.user_entry.grid(row=0, column=1, padx=8, pady=6) self.user_entry.focus_set() tk.Label(f, text="Mot de passe :", bg=BG, fg=FG, font=("Consolas", 11), width=14, anchor="w").grid(row=1, column=0, padx=8, pady=6) self.pwd_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 11), insertbackground=FG, width=22, show="*") self.pwd_entry.grid(row=1, column=1, padx=8, pady=6) self.msg_label = tk.Label(self, text="", bg=BG, fg="#f38ba8", font=("Consolas", 10)) self.msg_label.pack(pady=8) bf = tk.Frame(self, bg=BG) bf.pack(pady=10) tk.Button(bf, text="Connexion", bg="#40a02b", fg="white", font=("Consolas", 11, "bold"), padx=20, pady=4, command=self._login).pack(side="left", padx=8) tk.Button(bf, text="Quitter", bg="#313244", fg="#cdd6f4", font=("Consolas", 10), padx=12, pady=4, command=self._quit).pack(side="left", padx=8) tk.Label(self, text=f"SQATM Patch Manager v{VERSION}", bg=BG, fg="#45475a", font=("Consolas", 9)).pack(side="bottom", pady=5) def _login(self): username = self.user_entry.get().strip() password = self.pwd_entry.get() if not username or not password: self.msg_label.configure(text="Saisir utilisateur et mot de passe") return user, error = self.db.authenticate(username, password) if error: self.msg_label.configure(text=error) self.pwd_entry.delete(0, "end") return self.db.log_action(username, "LOGIN", f"role={user['role']}") self.result = user self.destroy() def _quit(self): self.result = None self.destroy() # ============================================================================== # CHANGE PASSWORD DIALOG # ============================================================================== class ChangePasswordDialog(tk.Toplevel): def __init__(self, parent, db, username, forced=False): super().__init__(parent) self._parent = parent self.db = db self.username = username self.forced = forced self.success = False self.title("Changement de mot de passe") self.configure(bg="#1e1e2e") self.resizable(False, False) if forced: self.protocol("WM_DELETE_WINDOW", lambda: None) self._build() center_window(self, 420, 300, parent=parent) self.grab_set() self.bind("", lambda e: self._change()) self.wait_window() def _build(self): BG = "#1e1e2e"; BG2 = "#2a2a3e"; FG = "#cdd6f4"; ACCENT = "#89b4fa" msg = "Premier login : vous devez changer votre mot de passe" if self.forced else "Changement de mot de passe" tk.Label(self, text=msg, bg="#181825", fg=ACCENT, font=("Consolas", 11, "bold"), pady=10).pack(fill="x") f = tk.Frame(self, bg=BG) f.pack(pady=15) tk.Label(f, text="Nouveau mdp :", bg=BG, fg=FG, font=("Consolas", 10), width=16, anchor="w").grid(row=0, column=0, padx=8, pady=6) self.new_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 10), insertbackground=FG, width=22, show="*") self.new_entry.grid(row=0, column=1, padx=8, pady=6) self.new_entry.focus_set() tk.Label(f, text="Confirmer :", bg=BG, fg=FG, font=("Consolas", 10), width=16, anchor="w").grid(row=1, column=0, padx=8, pady=6) self.confirm_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 10), insertbackground=FG, width=22, show="*") self.confirm_entry.grid(row=1, column=1, padx=8, pady=6) self.msg_label = tk.Label(self, text="", bg=BG, fg="#f38ba8", font=("Consolas", 10)) self.msg_label.pack(pady=5) tk.Button(self, text="Valider", bg="#40a02b", fg="white", font=("Consolas", 11, "bold"), padx=20, pady=4, command=self._change).pack(pady=10) def _change(self): p1 = self.new_entry.get() p2 = self.confirm_entry.get() if len(p1) < 4: self.msg_label.configure(text="Minimum 4 caracteres") return if p1 != p2: self.msg_label.configure(text="Les mots de passe ne correspondent pas") return self.db.change_password(self.username, p1) self.db.log_action(self.username, "CHANGE_PASSWORD", "") self.success = True self.destroy() # ============================================================================== # DIALOGUE SETTINGS # ============================================================================== class SettingsDialog(tk.Toplevel): def __init__(self, parent, settings): super().__init__(parent) self._parent = parent self.title("Parametres") self.configure(bg="#1e1e2e") self.resizable(True, True) self.settings = dict(settings) self.result = None self._build() # Taille adaptee a l'ecran sh = parent.winfo_screenheight() h = min(780, int(sh * 0.85)) center_window(self, 600, h, parent=parent) self.grab_set() self.wait_window() def _build(self): BG = "#1e1e2e"; BG2 = "#2a2a3e"; FG = "#cdd6f4"; ACCENT = "#89b4fa" # Titre fixe en haut tk.Label(self, text="PARAMETRES", bg="#313244", fg=ACCENT, font=("Consolas",13,"bold"), pady=8).pack(fill="x") # Zone scrollable canvas = tk.Canvas(self, bg=BG, highlightthickness=0) scrollbar = ttk.Scrollbar(self, orient="vertical", command=canvas.yview) self.scroll_frame = tk.Frame(canvas, bg=BG) self.scroll_frame.bind("", lambda e: canvas.configure(scrollregion=canvas.bbox("all"))) canvas.create_window((0, 0), window=self.scroll_frame, anchor="nw") canvas.configure(yscrollcommand=scrollbar.set) canvas.pack(side="left", fill="both", expand=True) scrollbar.pack(side="right", fill="y") # Scroll molette canvas.bind_all("", lambda e: canvas.yview_scroll(int(-1*(e.delta/120)), "units")) sf = self.scroll_frame def row(label, key, default=""): f = tk.Frame(sf, bg=BG) f.pack(fill="x", padx=20, pady=4) tk.Label(f, text=label, bg=BG, fg=FG, font=("Consolas",10), width=22, anchor="w").pack(side="left") var = tk.StringVar(value=self.settings.get(key, default)) setattr(self, f"var_{key}", var) tk.Entry(f, textvariable=var, bg=BG2, fg=FG, font=("Consolas",10), insertbackground=FG, width=32).pack(side="left", padx=4) return var tk.Label(sf, text="SSH — Recette", bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2)) row("Cle SSH principale :", "keyfile") row("Cle SSH secondaire :", "keyfile2") tk.Label(sf, text="CyberArk — Production", bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2)) row("Compte CyberArk :", "cybr_user", "CYBP01336") row("Utilisateur cible :", "target_user", "cybsecope") row("PSMP hostname :", "psmp", "psmp.sanef.fr") tk.Label(sf, text="Identite patcheur", bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2)) row("Nom du patcheur :", "patcher", "") tk.Label(sf, text="vSphere — Snapshots", bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2)) row("Utilisateur vCenter :", "vs_user", "") f_pwd = tk.Frame(sf, bg=BG) f_pwd.pack(fill="x", padx=20, pady=4) tk.Label(f_pwd, text="Mot de passe vCenter :", bg=BG, fg=FG, font=("Consolas",10), width=22, anchor="w").pack(side="left") self.var_vs_pwd = tk.StringVar(value="") # Jamais pre-rempli tk.Entry(f_pwd, textvariable=self.var_vs_pwd, show="*", bg=BG2, fg=FG, font=("Consolas",10), insertbackground=FG, width=32).pack(side="left", padx=4) tk.Label(f_pwd, text="(non stocke)", bg=BG, fg="#6c7086", font=("Consolas",8)).pack(side="left", padx=4) tk.Label(sf, text="Connexion SSH", bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2)) row("Timeout SSH (s) :", "timeout", "20") row("Parallelisme :", "parallelism", "3") tk.Label(sf, text="Splunk HEC (logs)", bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2)) self.var_splunk_enabled = tk.BooleanVar( value=self.settings.get("splunk_enabled", "false") == "true") tk.Checkbutton(sf, text="Activer l'envoi vers Splunk", variable=self.var_splunk_enabled, bg=BG, fg=FG, selectcolor=BG2, activebackground=BG, font=("Consolas",10)).pack(anchor="w", padx=20) row("URL HEC :", "splunk_url", "https://splunk.sanef.fr:8088") # Token non stocke dans le JSON — session only f_tok = tk.Frame(sf, bg=BG) f_tok.pack(fill="x", padx=20, pady=4) tk.Label(f_tok, text="Token HEC :", bg=BG, fg=FG, font=("Consolas",10), width=22, anchor="w").pack(side="left") self.var_splunk_token = tk.StringVar(value="") # Jamais pre-rempli tk.Entry(f_tok, textvariable=self.var_splunk_token, show="*", bg=BG2, fg=FG, font=("Consolas",10), insertbackground=FG, width=32).pack(side="left", padx=4) tk.Label(f_tok, text="(non stocke)", bg=BG, fg="#6c7086", font=("Consolas",8)).pack(side="left", padx=4) row("Index :", "splunk_index", "main") row("Proxy (optionnel) :", "proxy_url", "http://proxy.sanef.fr:8080") # Boutons en bas du scroll bf = tk.Frame(sf, bg=BG) bf.pack(pady=16) tk.Button(bf, text="Sauvegarder", bg="#40a02b", fg="white", font=("Consolas",11,"bold"), padx=12, pady=6, command=self._save).pack(side="left", padx=8) tk.Button(bf, text="Annuler", bg="#313244", fg="#cdd6f4", font=("Consolas",10), padx=12, pady=6, command=self.destroy).pack(side="left", padx=8) def _save(self): keys = ["keyfile","keyfile2","cybr_user","target_user","psmp","timeout","parallelism", "patcher","vs_user","splunk_url","splunk_token","splunk_index","proxy_url"] for k in keys: var = getattr(self, f"var_{k}", None) if var: val = var.get().strip() if k in ("timeout","parallelism"): try: val = int(val) except: val = DEFAULT_SETTINGS.get(k, 20) self.settings[k] = val # Splunk enabled (stocke dans JSON — pas sensible) if hasattr(self, "var_splunk_enabled"): self.settings["splunk_enabled"] = "true" if self.var_splunk_enabled.get() else "false" # vCenter pwd et Splunk token : session only, jamais dans le JSON if hasattr(self, "var_vs_pwd"): self.settings["_vs_pwd_session"] = self.var_vs_pwd.get() if hasattr(self, "var_splunk_token"): self.settings["_splunk_token_session"] = self.var_splunk_token.get() self.result = self.settings # Sauvegarder dans JSON SANS les secrets to_save = {k: v for k, v in self.settings.items() if not k.startswith("_")} save_settings(to_save) self.destroy() # ============================================================================== # DIALOGUE CONFIRMATION COMMANDE YUM # ============================================================================== class YumConfirmDialog(tk.Toplevel): def __init__(self, parent, server, cmd): super().__init__(parent) self.title("Confirmation commande patch") self.configure(bg="#1e1e2e") self.result = None # "apply", "apply_all", "cancel" self.cmd_var = tk.StringVar(value=cmd) self._build(server, cmd) center_window(self, 680, 340, parent=parent) self.grab_set() self.wait_window() def _build(self, server, cmd): BG = "#1e1e2e"; FG = "#cdd6f4"; ACCENT = "#89b4fa"; YELLOW = "#f9e2af" tk.Label(self, text="⚠️ CONFIRMATION COMMANDE PATCH", bg="#313244", fg=ACCENT, font=("Consolas",12,"bold"), pady=8).pack(fill="x") tk.Label(self, text=f"Serveur : {server}", bg=BG, fg=YELLOW, font=("Consolas",11)).pack(anchor="w", padx=16, pady=6) tk.Label(self, text="Commande à exécuter :", bg=BG, fg=FG, font=("Consolas",10)).pack(anchor="w", padx=16) cmd_entry = tk.Text(self, height=5, bg="#11111b", fg="#a6e3a1", font=("Consolas",10), wrap="word", insertbackground=FG) cmd_entry.insert("1.0", cmd) cmd_entry.pack(fill="x", padx=16, pady=4) self._cmd_text = cmd_entry bf = tk.Frame(self, bg=BG) bf.pack(pady=12) tk.Button(bf, text="▶ Appliquer", bg="#40a02b", fg="white", font=("Consolas",11,"bold"), padx=10, pady=6, command=lambda: self._set("apply")).pack(side="left", padx=6) tk.Button(bf, text="▶▶ Appliquer à TOUS", bg="#1e66f5", fg="white", font=("Consolas",11,"bold"), padx=10, pady=6, command=lambda: self._set("apply_all")).pack(side="left", padx=6) tk.Button(bf, text="✗ Annuler ce serveur", bg="#d20f39", fg="white", font=("Consolas",10), padx=10, pady=6, command=lambda: self._set("cancel")).pack(side="left", padx=6) def _set(self, val): self.result = val self.final_cmd = self._cmd_text.get("1.0", "end").strip() self.destroy() # ============================================================================== # APPLICATION PRINCIPALE # ============================================================================== class PatchManagerV2: def __init__(self, root, db, current_user): self.root = root self.db = db self.current_user = current_user # {"username":..., "role":...} self.root.title(f"SANEF Linux Patch Manager v{VERSION} — {current_user['username']} ({current_user['role']})") self.root.configure(bg="#1e1e2e") # Plein ecran sur le moniteur ou se trouve la fenetre try: center_window(root, root.winfo_screenwidth(), root.winfo_screenheight(), parent=root) root.after(100, lambda: root.state("zoomed")) except Exception: root.state("zoomed") self.settings = load_settings() self.servers = [] self.results = [] self.running = False self.pkey = None self.pkey2 = None self.cyb_pwd = None self.apply_all_cmd = False self.is_current_week = True # True = semaine courante (patch autorise) self._build_ui() self._reload_keys() self.audit("APP_START", "") # ========================================================================== # COULEURS # ========================================================================== BG = "#1e1e2e" BG2 = "#2a2a3e" FG = "#cdd6f4" ACCENT = "#89b4fa" GREEN = "#a6e3a1" RED = "#f38ba8" YELLOW = "#f9e2af" ORANGE = "#fab387" BTN = "#313244" # ========================================================================== # AUDIT HELPER # ========================================================================== def audit(self, action, details=""): username = self.current_user["username"] self.db.log_action(username, action, details) # Envoi Splunk en arriere-plan (non bloquant) threading.Thread(target=send_to_splunk, args=(self.settings, { "action": action, "user": username, "details": details, "timestamp": datetime.now().isoformat(), }), daemon=True).start() # ========================================================================== # BUILD UI # ========================================================================== def _build_ui(self): BG=self.BG; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN # Style ttk style = ttk.Style() style.theme_use("clam") style.configure("TFrame", background=self.BG) style.configure("TLabel", background=self.BG, foreground=self.FG, font=("Consolas",10)) style.configure("Treeview", background=self.BG2, foreground=self.FG, fieldbackground=self.BG2, font=("Consolas",9), rowheight=22) style.configure("Treeview.Heading", background=self.BTN, foreground=self.ACCENT, font=("Consolas",10,"bold")) style.map("Treeview", background=[("selected","#45475a")]) style.configure("TCombobox", fieldbackground=self.BG2, foreground=self.FG, background=self.BG2, font=("Consolas",10)) style.configure("TCheckbutton",background=self.BG, foreground=self.FG, font=("Consolas",10)) # ── Barre titre ──────────────────────────────────────────────────────── tb = tk.Frame(self.root, bg="#181825", pady=6) tb.pack(fill="x") tk.Label(tb, text=f"⚡ SANEF LINUX PATCH MANAGER v{VERSION}", bg="#181825", fg=ACCENT, font=("Consolas",14,"bold")).pack(side="left", padx=12) tk.Label(tb, text=f"Semaine {date.today().isocalendar()[1]} | {date.today():%d/%m/%Y}", bg="#181825", fg="#6c7086", font=("Consolas",10)).pack(side="left", padx=12) tk.Button(tb, text="A propos", bg=BTN, fg="#6c7086", font=("Consolas",9), pady=2, command=self._show_about).pack(side="right", padx=4) tk.Button(tb, text="Mon MDP", bg=BTN, fg="#fab387", font=("Consolas",9), pady=2, command=self._change_my_password).pack(side="right", padx=4) tk.Button(tb, text="Settings", bg=BTN, fg=FG, font=("Consolas",10), pady=2, command=self._open_settings).pack(side="right", padx=12) # ── Notebook (onglets) ───────────────────────────────────────────────── nb_style = ttk.Style() nb_style.configure("TNotebook", background=self.BG, borderwidth=0) nb_style.configure("TNotebook.Tab", background=self.BTN, foreground=self.FG, font=("Consolas",10,"bold"), padding=[12,4]) nb_style.map("TNotebook.Tab", background=[("selected","#313244")], foreground=[("selected",self.ACCENT)]) self.nb = ttk.Notebook(self.root) self.nb.pack(fill="both", expand=True, padx=6, pady=4) # Onglets self.tab_servers = tk.Frame(self.nb, bg=BG) self.tab_patch = tk.Frame(self.nb, bg=BG) self.tab_post = tk.Frame(self.nb, bg=BG) self.tab_report = tk.Frame(self.nb, bg=BG) self.tab_audit = tk.Frame(self.nb, bg=BG) self.nb.add(self.tab_servers, text="1. Selection serveurs") self.nb.add(self.tab_patch, text="2. Patch") self.nb.add(self.tab_post, text="3. Post-patching") self.nb.add(self.tab_report, text="4. Rapport") if self.current_user["role"] == "admin": self.nb.add(self.tab_audit, text="5. Audit") self.tab_users = tk.Frame(self.nb, bg=BG) self.nb.add(self.tab_users, text="6. Utilisateurs") self._build_tab_servers() self._build_tab_patch() self._build_tab_post() self._build_tab_report() if self.current_user["role"] == "admin": self._build_tab_audit() self._build_tab_users() # Viewer = onglets patch/post desactives if self.current_user["role"] == "viewer": self.nb.tab(self.tab_patch, state="disabled") self.nb.tab(self.tab_post, state="disabled") # ========================================================================== # ONGLET 1 — SÉLECTION SERVEURS # ========================================================================== def _build_tab_servers(self): tab = self.tab_servers BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN # ── Ligne 1 : Excel ─────────────────────────────────────────────────── ef = tk.Frame(tab, bg=BG) ef.pack(fill="x", padx=10, pady=6) tk.Label(ef, text="📊 Fichier Excel :", bg=BG, fg=FG, font=("Consolas",10)).pack(side="left") self.excel_var = tk.StringVar(value=self.settings.get("excel_file","")) tk.Entry(ef, textvariable=self.excel_var, bg=BG2, fg=FG, font=("Consolas",10), insertbackground=FG, width=55).pack(side="left", padx=6) tk.Button(ef, text="Parcourir", bg=BTN, fg=FG, font=("Consolas",9), command=self._browse_excel).pack(side="left", padx=2) # Feuille tk.Label(ef, text="Feuille :", bg=BG, fg=FG, font=("Consolas",10)).pack(side="left", padx=(12,2)) self.sheet_var = tk.StringVar() self.sheet_cb = ttk.Combobox(ef, textvariable=self.sheet_var, state="readonly", width=10) self.sheet_cb.pack(side="left", padx=2) self.sheet_cb.bind("<>", lambda e: self._load_sheet()) # Navigation rapide semaines tk.Button(ef, text="◀", bg=BTN, fg=FG, font=("Consolas",10), width=2, command=self._prev_week).pack(side="left", padx=1) tk.Button(ef, text="▶", bg=BTN, fg=FG, font=("Consolas",10), width=2, command=self._next_week).pack(side="left", padx=1) tk.Label(ef, text="S. courante", bg=BG, fg="#6c7086", font=("Consolas",9)).pack(side="left", padx=2) tk.Button(ef, text="↺", bg=BTN, fg=self.ACCENT, font=("Consolas",10), width=2, command=self._goto_current_week).pack(side="left", padx=1) tk.Button(ef, text="Charger", bg="#1e66f5", fg="white", font=("Consolas",10,"bold"), pady=3, command=self._load_excel).pack(side="left", padx=8) # Indicateur semaine courante/visu self.week_mode_lbl = tk.Label(ef, text="", bg=self.BG, font=("Consolas",10,"bold")) self.week_mode_lbl.pack(side="left", padx=8) # ── Ligne 2 : Filtres ───────────────────────────────────────────────── ff = tk.Frame(tab, bg=BG) ff.pack(fill="x", padx=10, pady=4) tk.Label(ff, text="Filtres :", bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(side="left") # Date du jour self.filter_today_var = tk.BooleanVar(value=False) tk.Checkbutton(ff, text="📅 Aujourd'hui uniquement", variable=self.filter_today_var, bg=BG, fg=self.YELLOW, selectcolor=BG2, activebackground=BG, font=("Consolas",10)).pack(side="left", padx=8) # Filtre Intervenant — visible uniquement pour admin self.filter_patcher_var = tk.StringVar(value="Tous") if self.current_user["role"] == "admin": tk.Label(ff, text="Intervenant :", bg=BG, fg=self.YELLOW, font=("Consolas",10,"bold")).pack(side="left", padx=(8,2)) self.filter_patcher_cb = ttk.Combobox(ff, textvariable=self.filter_patcher_var, state="readonly", width=14) self.filter_patcher_cb.pack(side="left", padx=2) self.filter_patcher_cb.bind("<>", lambda e: self._apply_filters()) else: self.filter_patcher_cb = None # Non-admin : pas de filtre intervenant for label, attr in [("Env", "filter_env"), ("Domaine", "filter_domain"), ("Application", "filter_app")]: tk.Label(ff, text=f"{label}:", bg=BG, fg=FG, font=("Consolas",10)).pack(side="left", padx=(8,2)) var = tk.StringVar(value="Tous") setattr(self, f"{attr}_var", var) cb = ttk.Combobox(ff, textvariable=var, state="readonly", width=14) setattr(self, f"{attr}_cb", cb) cb.pack(side="left", padx=2) tk.Button(ff, text="Rechercher", bg=BTN, fg=ACCENT, font=("Consolas",10,"bold"), pady=3, command=self._apply_filters).pack(side="left", padx=6) # ── Ligne 3 : Actions sélection ─────────────────────────────────────── af = tk.Frame(tab, bg=BG) af.pack(fill="x", padx=10, pady=2) for txt, cmd in [("✓ Tout (accordé)", self._select_all_ok), ("✗ Aucun", self._select_none), ("↕ Inverser", self._invert)]: tk.Button(af, text=txt, bg=BTN, fg=FG, font=("Consolas",9), command=cmd).pack(side="left", padx=3) self.count_lbl = tk.Label(af, text="", bg=BG, fg=self.YELLOW, font=("Consolas",9)) self.count_lbl.pack(side="right", padx=10) # ── Treeview serveurs ───────────────────────────────────────────────── tree_frame = tk.Frame(tab, bg=BG) tree_frame.pack(fill="both", expand=True, padx=10, pady=4) cols = ("sel","accord","serveur","intervenant","env","domaine","app","date_patch","snap","statut") self.srv_tree = ttk.Treeview(tree_frame, columns=cols, show="headings", height=22) hdrs = {"sel":"✓","accord":"Accord","serveur":"Serveur", "intervenant":"Intervenant","env":"Env", "domaine":"Domaine","app":"Application","date_patch":"Date patch", "snap":"Snapshot","statut":"Statut"} widths = {"sel":30,"accord":70,"serveur":180,"intervenant":90,"env":80,"domaine":120, "app":150,"date_patch":90,"snap":80,"statut":100} for c in cols: self.srv_tree.heading(c, text=hdrs[c]) self.srv_tree.column(c, width=widths[c], anchor="center" if c in ("sel","accord","snap","statut") else "w") self.srv_tree.tag_configure("no_accord", foreground="#585b70", background="#1e1e2e") self.srv_tree.tag_configure("ok", foreground=self.GREEN) self.srv_tree.tag_configure("warn", foreground=self.YELLOW) self.srv_tree.tag_configure("ko", foreground=self.RED) self.srv_tree.tag_configure("selected", foreground=self.FG, background="#313244") self.srv_tree.tag_configure("normal", foreground=self.FG) self.srv_tree.tag_configure("already_patched",foreground="#1e1e2e", background="#40a02b") vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.srv_tree.yview) hsb = ttk.Scrollbar(tree_frame, orient="horizontal", command=self.srv_tree.xview) self.srv_tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set) self.srv_tree.pack(side="left", fill="both", expand=True) vsb.pack(side="right", fill="y") hsb.pack(side="bottom", fill="x") self.srv_tree.bind("", self._toggle_srv) # ── Boutons prerequis + snapshot ────────────────────────────────────── prereq_frame = tk.Frame(tab, bg=BG) prereq_frame.pack(fill="x", padx=10, pady=4) self.prereq_btn = tk.Button(prereq_frame, text="Verifier prerequis", bg="#e64553", fg="white", font=("Consolas",12,"bold"), pady=6, command=self._check_prerequisites) self.prereq_btn.pack(side="left", padx=4) self.prereq_status = tk.Label(prereq_frame, text="", bg=BG, fg=self.FG, font=("Consolas",10)) self.prereq_status.pack(side="left", padx=12) self.snap_btn = tk.Button(prereq_frame, text="Prendre snapshot vSphere", bg="#313244", fg="#6c7086", font=("Consolas",10), pady=4, state="disabled", command=self._take_snapshots) self.snap_btn.pack(side="left", padx=4) tk.Button(prereq_frame, text="▶ Aller au Patch →", bg="#40a02b", fg="white", font=("Consolas",11,"bold"), pady=5, command=lambda: self.nb.select(self.tab_patch)).pack(side="right", padx=4) # ========================================================================== # ONGLET 2 — PATCH # ========================================================================== def _build_tab_patch(self): tab = self.tab_patch BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN top = tk.Frame(tab, bg=BG) top.pack(fill="both", expand=True, padx=10, pady=8) # Colonne gauche : options (scrollable) left_canvas = tk.Canvas(top, bg=BG, highlightthickness=0, width=380) left_sb = ttk.Scrollbar(top, orient="vertical", command=left_canvas.yview) left = tk.Frame(left_canvas, bg=BG) left.bind("", lambda e: left_canvas.configure(scrollregion=left_canvas.bbox("all"))) left_canvas.create_window((0, 0), window=left, anchor="nw") left_canvas.configure(yscrollcommand=left_sb.set) left_canvas.pack(side="left", fill="y") left_sb.pack(side="left", fill="y") left_canvas.bind_all("", lambda e: left_canvas.yview_scroll(int(-1*(e.delta/120)), "units")) tk.Label(left, text="OPTIONS PATCH", bg=BTN, fg=ACCENT, font=("Consolas",11,"bold"), padx=8, pady=4).pack(fill="x") # Preset packages tk.Label(left, text="Preset packages A PATCHER :", bg=BG, fg=FG, font=("Consolas",10)).pack(anchor="w", pady=(8,2)) self.preset_var = tk.StringVar(value=list(PRESET_PACKAGES.keys())[0]) preset_cb = ttk.Combobox(left, textvariable=self.preset_var, values=list(PRESET_PACKAGES.keys()), state="readonly", width=30) preset_cb.pack(anchor="w") preset_cb.bind("<>", self._on_preset) # Champ packages a patcher tk.Label(left, text="Packages specifiques a PATCHER :", bg=BG, fg=FG, font=("Consolas",10)).pack(anchor="w", pady=(8,2)) self.packages_var = tk.StringVar(value="") self.packages_entry = tk.Entry(left, textvariable=self.packages_var, bg=BG2, fg=FG, font=("Consolas",10), insertbackground=FG, width=32) self.packages_entry.pack(anchor="w") tk.Label(left, text="Ex: gnutls libsoup rhc", bg=BG, fg="#6c7086", font=("Consolas",9)).pack(anchor="w") # Champ packages a exclure tk.Label(left, text="Packages specifiques a EXCLURE :", bg=BG, fg=FG, font=("Consolas",10)).pack(anchor="w", pady=(8,2)) self.custom_excludes_var = tk.StringVar(value="") self.custom_excludes_entry = tk.Entry(left, textvariable=self.custom_excludes_var, bg=BG2, fg=FG, font=("Consolas",10), insertbackground=FG, width=32) self.custom_excludes_entry.pack(anchor="w") tk.Label(left, text="Ex: kernel* podman* docker*", bg=BG, fg="#6c7086", font=("Consolas",9)).pack(anchor="w") # Commande finale editable tk.Label(left, text="Commande finale (editable) :", bg=BG, fg=self.YELLOW, font=("Consolas",10,"bold")).pack(anchor="w", pady=(10,2)) self.final_cmd_var = tk.StringVar(value="") self.final_cmd_entry = tk.Entry(left, textvariable=self.final_cmd_var, bg="#181825", fg=self.YELLOW, font=("Consolas",9), insertbackground=self.YELLOW, width=48) self.final_cmd_entry.pack(anchor="w") tk.Button(left, text="Generer la commande", bg=BTN, fg=FG, font=("Consolas",9), command=self._preview_cmd).pack(anchor="w", pady=(4,0)) # Aide help_frame = tk.Frame(left, bg="#181825", pady=4, padx=8) help_frame.pack(fill="x", pady=(6,0)) tk.Label(help_frame, text="Preset = commande predeterminee", bg="#181825", fg="#a6e3a1", font=("Consolas",8)).pack(anchor="w") tk.Label(help_frame, text="Personnalise = packages a patcher ET/OU a exclure", bg="#181825", fg="#f9e2af", font=("Consolas",8)).pack(anchor="w") tk.Label(help_frame, text="La commande finale est toujours editable avant GO", bg="#181825", fg="#6c7086", font=("Consolas",8)).pack(anchor="w") # Options self.exclude_kernel_var = tk.BooleanVar(value=True) self.dryrun_var = tk.BooleanVar(value=True) tk.Checkbutton(left, text="☑ Exclure kernel (recommandé)", variable=self.exclude_kernel_var, bg=BG, fg=self.YELLOW, selectcolor=BG2, activebackground=BG, font=("Consolas",10,"bold")).pack(anchor="w", pady=(10,2)) tk.Checkbutton(left, text="🔍 Dry Run (simulation)", variable=self.dryrun_var, bg=BG, fg=self.ORANGE, selectcolor=BG2, activebackground=BG, font=("Consolas",10,"bold")).pack(anchor="w", pady=2) # Mot de passe CyberArk tk.Label(left, text="Mot de passe CyberArk (prod) :", bg=BG, fg=FG, font=("Consolas",10)).pack(anchor="w", pady=(12,2)) self.cybpwd_var = tk.StringVar() tk.Entry(left, textvariable=self.cybpwd_var, show="●", bg=BG2, fg=FG, font=("Consolas",10), insertbackground=FG, width=28).pack(anchor="w") # GO button self.go_btn = tk.Button(left, text="🚀 GO PATCH", bg="#d20f39", fg="white", font=("Consolas",16,"bold"), pady=12, padx=20, command=self._go_patch) self.go_btn.pack(fill="x", pady=16) self.stop_btn = tk.Button(left, text="⏹ STOP", bg="#6c7086", fg="white", font=("Consolas",11,"bold"), pady=6, state="disabled", command=self._stop) self.stop_btn.pack(fill="x") # Colonne droite : log temps réel right = tk.Frame(top, bg=BG) right.pack(side="left", fill="both", expand=True) tk.Label(right, text="DÉROULÉ EN TEMPS RÉEL", bg=BTN, fg=ACCENT, font=("Consolas",11,"bold"), pady=4).pack(fill="x") self.patch_log = scrolledtext.ScrolledText( right, height=28, bg="#11111b", fg=FG, font=("Consolas",9), insertbackground=FG, state="disabled") self.patch_log.pack(fill="both", expand=True, pady=4) for tag, color in [("ok",self.GREEN),("ko",self.RED), ("warn",self.YELLOW),("info",ACCENT), ("cmd","#cba6f7")]: self.patch_log.tag_configure(tag, foreground=color) # Barre progression pf = tk.Frame(tab, bg=BG) pf.pack(fill="x", padx=10, pady=4) self.prog_lbl = tk.Label(pf, text="En attente...", bg=BG, fg=FG, font=("Consolas",9)) self.prog_lbl.pack(side="left", padx=4) self.prog_var = tk.DoubleVar() ttk.Progressbar(pf, variable=self.prog_var, maximum=100, length=600).pack(side="left", fill="x", expand=True, padx=4) # ========================================================================== # ONGLET 3 — POST-PATCHING # ========================================================================== def _build_tab_post(self): tab = self.tab_post BG=self.BG; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN tk.Label(tab, text="POST-PATCHING", bg=BTN, fg=ACCENT, font=("Consolas",12,"bold"), pady=6).pack(fill="x", padx=10, pady=8) bf = tk.Frame(tab, bg=BG) bf.pack(fill="x", padx=10, pady=4) tk.Button(bf, text="🔍 Check post-patching", bg="#1e66f5", fg="white", font=("Consolas",11,"bold"), pady=6, command=self._post_patch_check).pack(side="left", padx=4) tk.Button(bf, text="🗑 Supprimer anciens kernels", bg="#e64553", fg="white", font=("Consolas",11,"bold"), pady=6, command=self._remove_old_kernels).pack(side="left", padx=4) tk.Button(bf, text="↩ Undo yum", bg="#fe640b", fg="white", font=("Consolas",11,"bold"), pady=6, command=self._undo_yum).pack(side="left", padx=4) tk.Button(bf, text="🔄 Reboot serveurs sélectionnés", bg="#8839ef", fg="white", font=("Consolas",11,"bold"), pady=6, command=self._reboot_servers).pack(side="left", padx=4) tk.Button(bf, text="🟩 Marquer patchés dans Excel", bg="#40a02b", fg="white", font=("Consolas",11,"bold"), pady=6, command=self._ask_mark_patched).pack(side="left", padx=4) # Deuxième rangée de boutons bf2 = tk.Frame(tab, bg=BG) bf2.pack(fill="x", padx=10, pady=2) tk.Button(bf2, text="🗑 Supprimer snapshots > 3 jours", bg="#e64553", fg="white", font=("Consolas",11,"bold"), pady=6, command=self._delete_old_snapshots).pack(side="left", padx=4) self.post_log = scrolledtext.ScrolledText( tab, bg="#11111b", fg=FG, font=("Consolas",9), insertbackground=FG, state="disabled") self.post_log.pack(fill="both", expand=True, padx=10, pady=4) for tag, color in [("ok",self.GREEN),("ko",self.RED), ("warn",self.YELLOW),("info",ACCENT)]: self.post_log.tag_configure(tag, foreground=color) # ========================================================================== # ONGLET 4 — RAPPORT PATCHING (style dashboard) # ========================================================================== def _build_tab_report(self): tab = self.tab_report BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN # ── Barre titre + boutons ───────────────────────────────────────────── hf = tk.Frame(tab, bg=BTN, pady=6) hf.pack(fill="x", padx=0, pady=0) tk.Label(hf, text="RAPPORT DE PATCHING", bg=BTN, fg=ACCENT, font=("Consolas",12,"bold")).pack(side="left", padx=12) bf = tk.Frame(hf, bg=BTN) bf.pack(side="right", padx=8) tk.Button(bf, text="Generer", bg="#40a02b", fg="white", font=("Consolas",10,"bold"), pady=3, command=self._generate_report).pack(side="left", padx=4) tk.Button(bf, text="CSV", bg=BTN, fg=FG, font=("Consolas",10), pady=3, command=self._export_csv).pack(side="left", padx=4) tk.Button(bf, text="DokuWiki", bg=BTN, fg=FG, font=("Consolas",10), pady=3, command=self._export_dokuwiki).pack(side="left", padx=4) # ── Widgets KPI (ligne de compteurs) ────────────────────────────────── kpi_frame = tk.Frame(tab, bg=BG) kpi_frame.pack(fill="x", padx=10, pady=8) # Créer 6 widgets KPI self.kpi_widgets = {} kpi_defs = [ ("total", "Total traités", "#313244", FG), ("ok", "Connexion OK", "#1a4a2e", self.GREEN), ("patched", "Patchés", "#1a4a2e", self.GREEN), ("uptodate", "Déjà à jour", "#2a2a3e", ACCENT), ("reboot", "Reboot requis", "#4a3000", self.YELLOW), ("ko", "KO / Auth", "#4a1a1a", self.RED), ] for key, label, bg_col, fg_col in kpi_defs: frame = tk.Frame(kpi_frame, bg=bg_col, padx=16, pady=10, relief="flat", bd=0) frame.pack(side="left", fill="x", expand=True, padx=4) # Valeur val_lbl = tk.Label(frame, text="—", bg=bg_col, fg=fg_col, font=("Consolas",24,"bold")) val_lbl.pack() # Label tk.Label(frame, text=label, bg=bg_col, fg=fg_col, font=("Consolas",9)).pack() self.kpi_widgets[key] = val_lbl # ── Tableau détaillé résultats ───────────────────────────────────────── tk.Label(tab, text="DETAIL PAR SERVEUR", bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=12, pady=(4,2)) tree_frame = tk.Frame(tab, bg=BG) tree_frame.pack(fill="both", expand=True, padx=10, pady=4) rcols = ("serveur","env","domaine","accord","snap","connexion","patch","reboot","detail") self.report_tree = ttk.Treeview(tree_frame, columns=rcols, show="headings", height=14) rhdrs = { "serveur": "Serveur", "env": "Env", "domaine": "Domaine", "accord": "Accord", "snap": "Snapshot", "connexion": "Connexion", "patch": "Patch", "reboot": "Reboot", "detail": "Detail packages", } rwidths = { "serveur":200,"env":80,"domaine":100,"accord":70, "snap":80,"connexion":90,"patch":100,"reboot":70,"detail":280 } for c in rcols: self.report_tree.heading(c, text=rhdrs[c]) self.report_tree.column(c, width=rwidths[c], anchor="center" if c not in ("serveur","detail") else "w") self.report_tree.tag_configure("ok", foreground=self.GREEN) self.report_tree.tag_configure("ko", foreground=self.RED) self.report_tree.tag_configure("warn", foreground=self.YELLOW) self.report_tree.tag_configure("patched", foreground=self.GREEN, background="#1a2e1a") self.report_tree.tag_configure("reboot", foreground=self.YELLOW, background="#2e2a00") self.report_tree.tag_configure("uptodate",foreground=ACCENT) vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.report_tree.yview) hsb = ttk.Scrollbar(tree_frame, orient="horizontal", command=self.report_tree.xview) self.report_tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set) self.report_tree.pack(side="left", fill="both", expand=True) vsb.pack(side="right", fill="y") hsb.pack(side="bottom", fill="x") # ── Zone log texte (optionnel, compact) ─────────────────────────────── tk.Label(tab, text="LOG BRUT", bg=BG, fg="#6c7086", font=("Consolas",9)).pack(anchor="w", padx=12, pady=(4,0)) self.report_txt = scrolledtext.ScrolledText( tab, height=6, bg="#11111b", fg=FG, font=("Consolas",8), insertbackground=FG, state="disabled") self.report_txt.pack(fill="x", padx=10, pady=(0,6)) for tag, color in [("ok",self.GREEN),("ko",self.RED), ("warn",self.YELLOW),("info",ACCENT), ("title","#cba6f7")]: self.report_txt.tag_configure(tag, foreground=color, font=("Consolas",9,"bold") if tag=="title" else ("Consolas",8)) # ========================================================================== # HELPERS LOG # ========================================================================== def _log(self, widget, msg, tag="info"): def _do(): widget.configure(state="normal") ts = datetime.now().strftime("%H:%M:%S") widget.insert("end", f"[{ts}] {msg}\n", tag) widget.see("end") widget.configure(state="disabled") self.root.after(0, _do) def _log_patch(self, msg, tag="info"): self._log(self.patch_log, msg, tag) def _log_post(self, msg, tag="info"): self._log(self.post_log, msg, tag) # ========================================================================== # PREREQUIS — SSH + Disque + Satellite + Snapshot # ========================================================================== def _check_prerequisites(self): selected = [s for s in self.servers if s.get("selected") and s["accord"] == "oui"] if not selected: messagebox.showwarning("Prerequis", "Aucun serveur selectionne avec accord=OUI") return # Recharger les cles SSH self._reload_keys() # Verifier si des serveurs prod necessitent PSMP has_prod = any("prod" in s.get("env", "").lower() for s in selected) if has_prod and not self.cyb_pwd: # Lire depuis le champ mot de passe CyberArk de l'onglet Patch pwd = self.cybpwd_var.get().strip() if hasattr(self, 'cybpwd_var') else "" if pwd: self.cyb_pwd = pwd else: # Demander le mot de passe via dialogue centre sur le parent dlg = tk.Toplevel(self.root) dlg.title("CyberArk PSMP") dlg.configure(bg="#1e1e2e") dlg.resizable(False, False) result_pwd = [None] tk.Label(dlg, text=f"Mot de passe CyberArk ({self.settings.get('cybr_user', 'CYBP01336')}) :", bg="#1e1e2e", fg="#cdd6f4", font=("Consolas", 11)).pack(padx=20, pady=(15, 5)) pwd_var = tk.StringVar() pwd_entry = tk.Entry(dlg, textvariable=pwd_var, show="*", bg="#2a2a3e", fg="#cdd6f4", font=("Consolas", 11), insertbackground="#cdd6f4", width=30) pwd_entry.pack(padx=20, pady=5) pwd_entry.focus_set() def _ok(event=None): result_pwd[0] = pwd_var.get().strip() dlg.destroy() pwd_entry.bind("", _ok) tk.Button(dlg, text="OK", bg="#40a02b", fg="white", font=("Consolas", 10, "bold"), padx=20, command=_ok).pack(pady=10) center_window(dlg, 400, 140, parent=self.root) dlg.grab_set() dlg.wait_window() pwd = result_pwd[0] if not pwd: messagebox.showwarning("Prerequis", "Mot de passe PSMP requis pour les serveurs Production") return self.cyb_pwd = pwd if hasattr(self, 'cybpwd_var'): self.cybpwd_var.set(pwd) self.prereq_btn.configure(state="disabled", text="Verification en cours...") self.prereq_status.configure(text="") self.audit("PREREQ_START", f"{len(selected)} serveurs (prod={has_prod})") self._log_patch(f"Prerequis : {len(selected)} serveurs a verifier...", "info") def worker(): results = {} total = len(selected) for idx, s in enumerate(selected): server = s["server"] env = s.get("env", "") r = {"ssh": False, "disk_root": None, "disk_log": None, "satellite": None, "is_vm": server.lower().startswith("v")} self.root.after(0, lambda sv=server, i=idx: self.prereq_status.configure( text=f"[{i+1}/{total}] {sv}")) self._log_patch(f"\n[{idx+1}/{total}] {server} ({env})", "info") # 1. Check SSH self._log_patch(f" SSH...", "info") is_prod = "prod" in env.lower() method = "PSMP" if is_prod else "cle SSH" client, eff = ssh_connect(server, env, self.settings, self.pkey, self.pkey2, self.cyb_pwd) if not client: r["ssh"] = False r["error"] = "Connexion SSH impossible" self._log_patch(f" SSH ({method}) ECHEC", "ko") results[server] = r continue r["ssh"] = True self._log_patch(f" SSH ({method}) OK : {eff}", "ok") # 2. Check disque / self._log_patch(f" Disque / ...", "info") out, _ = run_cmd(client, "df -BM / | awk 'NR==2{print $4}' | tr -d 'M'", 10) try: r["disk_root"] = int(out.strip()) except (ValueError, AttributeError): r["disk_root"] = -1 if r["disk_root"] >= 1200: self._log_patch(f" Disque / : {r['disk_root']} Mo libre (OK >= 1200)", "ok") else: self._log_patch(f" Disque / : {r['disk_root']} Mo libre (KO < 1200)", "ko") # Check disque /var/log self._log_patch(f" Disque /var/log ...", "info") out2, _ = run_cmd(client, "df -BM /var/log | awk 'NR==2{print $4}' | tr -d 'M'", 10) try: r["disk_log"] = int(out2.strip()) except (ValueError, AttributeError): r["disk_log"] = -1 if r["disk_log"] >= 500: self._log_patch(f" Disque /var/log : {r['disk_log']} Mo libre (OK >= 500)", "ok") else: self._log_patch(f" Disque /var/log : {r['disk_log']} Mo libre (KO < 500)", "ko") # 3. Check Satellite self._log_patch(f" Satellite ...", "info") sat_out, _ = run_cmd(client, "subscription-manager status 2>/dev/null | head -5", 15) if sat_out and ("current" in sat_out.lower() or "valide" in sat_out.lower()): r["satellite"] = True self._log_patch(f" Satellite : OK (enregistre)", "ok") elif sat_out and "unknown" in sat_out.lower(): r["satellite"] = False self._log_patch(f" Satellite : NON enregistre", "ko") else: r["satellite"] = None self._log_patch(f" Satellite : N/A (subscription-manager absent)", "warn") client.close() results[server] = r # Afficher les resultats dans l'UI self.root.after(0, lambda: self._show_prereq_results(results, selected)) threading.Thread(target=worker, daemon=True).start() def _show_prereq_results(self, results, selected): self.prereq_btn.configure(state="normal", text="Verifier prerequis") self._log_patch(f"\n{'='*50}", "info") self._log_patch(f"RESUME PREREQUIS", "info") self._log_patch(f"{'='*50}", "info") # Construire le rapport ok_servers = [] ko_servers = [] warn_servers = [] for s in selected: server = s["server"] r = results.get(server, {}) if not r.get("ssh"): ko_servers.append((server, "SSH connexion impossible")) s["prereq_status"] = "SSH_KO" elif r.get("disk_root", 0) >= 0 and r["disk_root"] < 1200: ko_servers.append((server, f"/ : {r['disk_root']} Mo libre (min 1200 Mo)")) s["prereq_status"] = "DISK_KO" elif r.get("disk_log", 0) >= 0 and r["disk_log"] < 500: ko_servers.append((server, f"/var/log : {r['disk_log']} Mo libre (min 500 Mo)")) s["prereq_status"] = "DISK_KO" elif r.get("satellite") is False: ko_servers.append((server, "Satellite : non enregistre")) s["prereq_status"] = "SAT_KO" else: ok_servers.append(server) s["prereq_status"] = "OK" if r.get("satellite") is None: warn_servers.append((server, "subscription-manager absent (Debian/physique ?)")) # Log resume for srv in ok_servers: self._log_patch(f" OK : {srv}", "ok") for srv, reason in warn_servers: self._log_patch(f" WARN : {srv} — {reason}", "warn") for srv, reason in ko_servers: self._log_patch(f" KO : {srv} — {reason}", "ko") self._log_patch(f"\nTotal : {len(ok_servers)} OK, {len(warn_servers)} WARN, {len(ko_servers)} KO", "info") # Dialogue resultat msg = f"Prerequis verifies : {len(selected)} serveurs\n\n" msg += f"OK : {len(ok_servers)}\n" if warn_servers: msg += f"Avertissements : {len(warn_servers)}\n" if ko_servers: msg += f"BLOQUANTS : {len(ko_servers)}\n\n" msg += "Serveurs bloques :\n" for srv, reason in ko_servers: msg += f" {srv} : {reason}\n" msg += "\nDeselectionnez les serveurs bloques pour continuer." if ko_servers: # Deselectionner automatiquement les KO for srv, _ in ko_servers: for s in self.servers: if s["server"] == srv: s["selected"] = False self._apply_filters() messagebox.showwarning("Prerequis — problemes detectes", msg) self.prereq_status.configure(text=f"{len(ko_servers)} serveurs bloques", fg=self.RED) else: messagebox.showinfo("Prerequis OK", msg) self.prereq_status.configure(text=f"{len(ok_servers)} serveurs OK", fg=self.GREEN) # Activer le bouton snapshot pour les VM vm_count = sum(1 for s in selected if s.get("prereq_status") == "OK" and s["server"].lower().startswith("v")) if vm_count > 0: self.snap_btn.configure(state="normal", bg="#e64553", fg="white") self.audit("PREREQ_DONE", f"OK={len(ok_servers)} KO={len(ko_servers)} WARN={len(warn_servers)}") # ========================================================================== # SNAPSHOT — force sans snap # ========================================================================== def _force_patch_without_snap(self, server_name): """Demande une justification pour patcher une VM sans snapshot""" dlg = tk.Toplevel(self.root) dlg.title("Patching sans snapshot") dlg.configure(bg="#1e1e2e") dlg.resizable(False, False) result = [None] tk.Label(dlg, text=f"ATTENTION — VM sans snapshot", bg="#181825", fg="#f38ba8", font=("Consolas", 12, "bold"), pady=8).pack(fill="x") tk.Label(dlg, text=f"Serveur : {server_name}", bg="#1e1e2e", fg="#cdd6f4", font=("Consolas", 11)).pack(pady=(10, 5)) tk.Label(dlg, text="Justification obligatoire :", bg="#1e1e2e", fg="#f9e2af", font=("Consolas", 10)).pack(anchor="w", padx=20) justif_var = tk.StringVar() tk.Entry(dlg, textvariable=justif_var, bg="#2a2a3e", fg="#cdd6f4", font=("Consolas", 10), insertbackground="#cdd6f4", width=50).pack(padx=20, pady=5) def accept(): j = justif_var.get().strip() if len(j) < 5: messagebox.showwarning("Justification", "Minimum 5 caracteres") return result[0] = j dlg.destroy() bf = tk.Frame(dlg, bg="#1e1e2e") bf.pack(pady=10) tk.Button(bf, text="Forcer le patching", bg="#d20f39", fg="white", font=("Consolas", 10, "bold"), command=accept).pack(side="left", padx=8) tk.Button(bf, text="Annuler", bg="#313244", fg="#cdd6f4", font=("Consolas", 10), command=dlg.destroy).pack(side="left", padx=8) center_window(dlg, 500, 220, parent=self.root) dlg.grab_set() dlg.wait_window() if result[0]: self.audit("FORCE_PATCH_NO_SNAP", f"{server_name} : {result[0]}") return result[0] # ========================================================================== # A PROPOS # ========================================================================== def _show_about(self): dlg = tk.Toplevel(self.root) dlg.title("A propos") dlg.configure(bg="#1e1e2e") dlg.resizable(False, False) tk.Frame(dlg, bg="#e94560", height=3).pack(fill="x") tk.Label(dlg, text="SANEF Linux Patch Manager", bg="#1e1e2e", fg="#89b4fa", font=("Consolas", 16, "bold")).pack(pady=(20, 4)) tk.Label(dlg, text=f"Version {VERSION}", bg="#1e1e2e", fg="#cdd6f4", font=("Consolas", 12)).pack() tk.Frame(dlg, bg="#313244", height=1).pack(fill="x", padx=30, pady=12) tk.Label(dlg, text="SANEF DSI — Securite Operationnelle", bg="#1e1e2e", fg="#cdd6f4", font=("Consolas", 11)).pack() tk.Label(dlg, text="(KM)", bg="#1e1e2e", fg="#a6e3a1", font=("Consolas", 11)).pack(pady=(4, 0)) tk.Label(dlg, text="SECOPS 2026", bg="#1e1e2e", fg="#6c7086", font=("Consolas", 10)).pack(pady=(2, 0)) tk.Frame(dlg, bg="#313244", height=1).pack(fill="x", padx=30, pady=12) tk.Label(dlg, text="Orchestration du patching Linux\nvia Excel + SSH + vSphere", bg="#1e1e2e", fg="#6c7086", font=("Consolas", 9), justify="center").pack() tk.Button(dlg, text="Fermer", bg="#313244", fg="#cdd6f4", font=("Consolas", 10), padx=20, pady=4, command=dlg.destroy).pack(pady=16) center_window(dlg, 380, 320, parent=self.root) dlg.grab_set() # ========================================================================== # SETTINGS # ========================================================================== def _open_settings(self): dlg = SettingsDialog(self.root, self.settings) if dlg.result: self.settings = dlg.result self._reload_keys() def _reload_keys(self): self.pkey = load_key(self.settings.get("keyfile","")) self.pkey2 = load_key(self.settings.get("keyfile2","")) # ========================================================================== # ONGLET 1 — LOGIQUE # ========================================================================== def _browse_excel(self): path = filedialog.askopenfilename( filetypes=[("Excel","*.xlsx *.xls"),("All","*.*")]) if path: self.excel_var.set(path) self.settings["excel_file"] = path save_settings(self.settings) self._load_excel() def _prev_week(self): sheets = list(self.sheet_cb["values"]) if not sheets: return cur = self.sheet_var.get() idx = sheets.index(cur) if cur in sheets else 0 if idx > 0: self.sheet_var.set(sheets[idx-1]) self._load_sheet() def _next_week(self): sheets = list(self.sheet_cb["values"]) if not sheets: return cur = self.sheet_var.get() idx = sheets.index(cur) if cur in sheets else 0 if idx < len(sheets)-1: self.sheet_var.set(sheets[idx+1]) self._load_sheet() def _goto_current_week(self): week_num = date.today().isocalendar()[1] sheets = list(self.sheet_cb["values"]) for s in [f"S{week_num}", f"S{week_num:02d}"]: if s in sheets: self.sheet_var.set(s) self._load_sheet() return def _load_excel(self): path = self.excel_var.get().strip() if not path or not os.path.exists(path): messagebox.showerror("Erreur", f"Fichier introuvable :\n{path}") return self.audit("LOAD_EXCEL", path) try: import io try: with open(path, "rb") as f: data = io.BytesIO(f.read()) wb = openpyxl.load_workbook(data, read_only=True, data_only=True) except Exception: wb = openpyxl.load_workbook(path, read_only=True, data_only=True) sheets = wb.sheetnames wb.close() # Mettre à jour le combobox feuilles self.sheet_cb["values"] = sheets week_sheet = None week_num = date.today().isocalendar()[1] for s in sheets: if str(week_num) in s: week_sheet = s break self.sheet_var.set(week_sheet or (sheets[0] if sheets else "")) self._load_sheet() except Exception as e: messagebox.showerror("Erreur Excel", f"Impossible de modifier l'Excel :\n{e}") def _load_sheet(self): path = self.excel_var.get().strip() sheet = self.sheet_var.get() if not path or not sheet: return servers, _ = read_excel_servers(path, sheet) if servers is None: messagebox.showerror("Erreur", "Aucun serveur Linux trouvé") return # Détecter si c'est la semaine courante week_num = date.today().isocalendar()[1] self.is_current_week = any(str(week_num) in sheet for sheet in [sheet]) # Mettre à jour l'indicateur visuel if self.is_current_week: self.week_mode_lbl.configure( text="✅ SEMAINE COURANTE — Patch autorisé", fg=self.GREEN) else: self.week_mode_lbl.configure( text=f"👁 VUE SEULE ({sheet}) — Patch désactivé", fg=self.YELLOW) self.servers = servers # Peupler les filtres envs = sorted(set(s["env"] for s in servers if s["env"])) domains = sorted(set(s["domain"] for s in servers if s["domain"])) apps = sorted(set(s["app"] for s in servers if s["app"])) patchers = sorted(set(s["patcher"].strip() for s in servers if s.get("patcher") and s["patcher"].strip())) self.filter_env_cb["values"] = ["Tous"] + envs self.filter_domain_cb["values"] = ["Tous"] + domains self.filter_app_cb["values"] = ["Tous"] + apps if self.filter_patcher_cb: self.filter_patcher_cb["values"] = ["Tous"] + patchers self.filter_env_var.set("Tous") self.filter_domain_var.set("Tous") self.filter_app_var.set("Tous") # Auto-selectionner le patcheur if self.current_user["role"] == "admin": patcher_setting = self.settings.get("patcher", "").strip() if patcher_setting and patcher_setting in patchers: self.filter_patcher_var.set(patcher_setting) else: self.filter_patcher_var.set("Tous") else: self.filter_patcher_var.set("Tous") # Non-admin filtre par _apply_filters self._apply_filters() # Griser/activer le bouton GO selon la semaine if hasattr(self, "go_btn"): if self.is_current_week: self.go_btn.configure(bg="#d20f39", state="normal", text="🚀 GO PATCH") else: self.go_btn.configure(bg="#45475a", state="normal", text="👁 VUE SEULE — Patch désactivé") # Vérifier snapshots si des serveurs ont accord=oui accord_servers = [s for s in self.servers if s.get("accord") == "oui" and not s.get("is_physical")] if accord_servers and VSPHERE_OK: self.root.after(200, self._prompt_vcenter_and_check_snaps) def _apply_filters(self): env_f = self.filter_env_var.get() domain_f = self.filter_domain_var.get() app_f = self.filter_app_var.get() patcher_f = self.filter_patcher_var.get() today_f = self.filter_today_var.get() today = date.today() # Non-admin : ne voit que ses serveurs (intervenant = son username) is_admin = self.current_user["role"] == "admin" my_name = self.current_user["username"].lower() filtered = [] for s in self.servers: # Filtre par utilisateur connecte (non-admin) if not is_admin: srv_patcher = (s.get("patcher", "") or "").strip().lower() if srv_patcher != my_name: continue if patcher_f != "Tous": srv_patcher = (s.get("patcher", "") or "").strip() patcher_cmp = patcher_f.strip() if srv_patcher and srv_patcher.lower() != patcher_cmp.lower(): continue if env_f != "Tous" and s["env"] != env_f: continue if domain_f != "Tous" and s["domain"] != domain_f: continue if app_f != "Tous" and s["app"] != app_f: continue if today_f and s["date_patch"] and s["date_patch"] != today: continue filtered.append(s) self._populate_tree(filtered) def _populate_tree(self, servers): for item in self.srv_tree.get_children(): self.srv_tree.delete(item) # Compteur pour gérer les doublons de nom de serveur dans la feuille seen = {} for s in servers: accord_ok = s["accord"] == "oui" sel_mark = "✓" if s.get("selected") and accord_ok else ("✗" if accord_ok else "—") accord_lbl= "✅ OUI" if accord_ok else "❌ NON" snap_lbl = "✅ OK" if s.get("snap_done") else ("🖥 PHY" if s.get("is_physical") else "—") date_str = s["date_patch"].strftime("%d/%m/%Y") if s.get("date_patch") else "—" if s.get("already_patched"): tag = "already_patched" elif not accord_ok: tag = "no_accord" elif s.get("selected"): tag = "selected" else: tag = "normal" # Générer un iid unique si le nom de serveur apparaît en double base_iid = s["server"] seen[base_iid] = seen.get(base_iid, 0) + 1 iid = base_iid if seen[base_iid] == 1 else f"{base_iid}_#{seen[base_iid]}" s["_iid"] = iid # stocker pour _refresh_tree_item self.srv_tree.insert("", "end", iid=iid, values=(sel_mark, accord_lbl, s["server"], s.get("patcher",""), s["env"], s["domain"], s["app"], date_str, snap_lbl, s["status"]), tags=(tag,)) n = sum(1 for s in servers if s.get("selected") and s["accord"]=="oui") self.count_lbl.configure(text=f"{len(servers)} serveurs affichés — {n} sélectionnés") def _get_server(self, name): for s in self.servers: if s["server"] == name: return s return None def _get_server_by_iid(self, iid): """Retrouver un serveur par son iid treeview (gère les doublons de noms).""" for s in self.servers: if s.get("_iid") == iid: return s # Fallback : chercher par nom (compatibilité) return self._get_server(iid) def _toggle_srv(self, event): region = self.srv_tree.identify_region(event.x, event.y) if region != "cell": return item = self.srv_tree.identify_row(event.y) if not item: return # Retrouver le serveur par son iid stocké (gère les doublons) s = self._get_server_by_iid(item) if not s or s["accord"] != "oui": return # accord obligatoire s["selected"] = not s.get("selected", False) self._refresh_tree_item(s) n = sum(1 for x in self.servers if x.get("selected") and x["accord"]=="oui") self.count_lbl.configure(text=f"— {n} sélectionnés") def _refresh_tree_item(self, s): # Chercher l'iid : d'abord _iid stocké, sinon nom serveur, sinon chercher dans l'arbre iid = s.get("_iid", s["server"]) if not self.srv_tree.exists(iid): # Fallback : chercher parmi tous les items de l'arbre par valeur "server" for item in self.srv_tree.get_children(): vals = self.srv_tree.item(item, "values") if vals and len(vals) > 2 and vals[2] == s["server"]: iid = item s["_iid"] = iid # mémoriser pour la prochaine fois break else: return # serveur pas dans l'arbre (filtré) accord_ok = s["accord"] == "oui" sel_mark = "✓" if s.get("selected") else "✗" snap_lbl = "✅ OK" if s.get("snap_done") else ("🖥 PHY" if s.get("is_physical") else "—") date_str = s["date_patch"].strftime("%d/%m/%Y") if s.get("date_patch") else "—" tag = "selected" if s.get("selected") else "normal" self.srv_tree.item(iid, values=( sel_mark, "✅ OUI" if accord_ok else "❌ NON", s["server"], s.get("patcher",""), s["env"], s["domain"], s["app"], date_str, snap_lbl, s["status"]), tags=(tag,)) def _select_all_ok(self): for s in self.servers: if s["accord"] == "oui" and self.srv_tree.exists(s["server"]): s["selected"] = True self._refresh_tree_item(s) n = sum(1 for s in self.servers if s.get("selected")) self.count_lbl.configure(text=f"— {n} sélectionnés") def _select_none(self): for s in self.servers: s["selected"] = False if self.srv_tree.exists(s["server"]): self._refresh_tree_item(s) self.count_lbl.configure(text="— 0 sélectionnés") def _invert(self): for s in self.servers: if s["accord"] == "oui" and self.srv_tree.exists(s["server"]): s["selected"] = not s.get("selected", False) self._refresh_tree_item(s) n = sum(1 for s in self.servers if s.get("selected")) self.count_lbl.configure(text=f"— {n} sélectionnés") # ========================================================================== # SNAPSHOT VSPHERE # ========================================================================== def _prompt_vcenter_and_check_snaps(self): """Prompt vCenter puis vérifie les snapshots des serveurs sélectionnés avec accord.""" accord_servers = [s for s in self.servers if s.get("selected") and s.get("accord") == "oui" and not s.get("is_physical")] if not accord_servers: return # Si identifiants déjà en settings, utiliser directement vs_user = self.settings.get("vs_user", "").strip() vs_pwd = self.settings.get("_vs_pwd_session", "").strip() if vs_user and vs_pwd: self._log_patch( f"Vérification snapshots ({len(accord_servers)} serveurs avec accord)...", "info") threading.Thread(target=self._check_snap_worker, args=(accord_servers, vs_user, vs_pwd), daemon=True).start() return # Sinon : prompt identifiants dlg = tk.Toplevel(self.root) dlg.title("vSphere — Vérification snapshots") dlg.configure(bg=self.BG) dlg.grab_set() dlg.resizable(False, False) self._center_dialog(dlg, 400, 230) tk.Label(dlg, text="Connexion vCenter", bg=self.BG, fg=self.ACCENT, font=("Consolas",12,"bold")).pack(pady=10) msg_lbl = f"Vérification snapshots pour {len(accord_servers)} serveur(s) avec accord." tk.Label(dlg, text=msg_lbl, bg=self.BG, fg=self.FG, font=("Consolas",9), justify="center").pack(pady=(0,8)) for lbl, attr, show in [ ("Utilisateur :", "dlg_vs_user", ""), ("Mot de passe :", "dlg_vs_pwd", "●") ]: f = tk.Frame(dlg, bg=self.BG); f.pack(fill="x", padx=24, pady=4) tk.Label(f, text=lbl, bg=self.BG, fg=self.FG, font=("Consolas",10), width=14).pack(side="left") var = tk.StringVar(value=self.settings.get("vs_user", "") if "Util" in lbl else "") setattr(self, attr, var) tk.Entry(f, textvariable=var, show=show, bg=self.BG2, fg=self.FG, font=("Consolas",10), insertbackground=self.FG, width=26).pack(side="left", padx=4) def do_check(): user = self.dlg_vs_user.get().strip() pwd = self.dlg_vs_pwd.get().strip() if not user or not pwd: messagebox.showwarning("Attention", "Identifiants requis.", parent=dlg) return # Sauvegarder en settings pour ne pas re-demander self.settings["vs_user"] = user self.settings["_vs_pwd_session"] = pwd save_settings(self.settings) dlg.destroy() self._log_patch( f"Vérification snapshots ({len(accord_servers)} serveurs avec accord)...", "info") threading.Thread(target=self._check_snap_worker, args=(accord_servers, user, pwd), daemon=True).start() def skip(): dlg.destroy() self._log_patch("Vérification snapshot ignorée.", "warn") bf = tk.Frame(dlg, bg=self.BG); bf.pack(pady=10) tk.Button(bf, text="🔍 Vérifier", bg=self.ACCENT, fg="white", font=("Consolas",11,"bold"), padx=10, pady=4, command=do_check).pack(side="left", padx=6) tk.Button(bf, text="Ignorer", bg=self.BG2, fg=self.FG, font=("Consolas",10), padx=10, pady=4, command=skip).pack(side="left", padx=6) dlg.bind("", lambda e: do_check()) def _check_snap_worker(self, servers, vs_user, vs_pwd): """Thread : vérifie l'existence d'un snapshot PrePatch sur les vCenters.""" snap_prefix = "PrePatch_" def has_prepatch_snap(vm): """Retourne True si la VM a un snapshot dont le nom commence par PrePatch_""" if not vm.snapshot or not vm.snapshot.rootSnapshotList: return False def walk(snap_list): for snap in snap_list: if snap.name.startswith(snap_prefix): return True if snap.childSnapshotList and walk(snap.childSnapshotList): return True return False return walk(vm.snapshot.rootSnapshotList) for vc in VSPHERE_HOSTS: # Serveurs pas encore trouvés pending = [s for s in servers if not s.get("snap_done")] if not pending: break try: import ssl ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE si = SmartConnect(host=vc, user=vs_user, pwd=vs_pwd, sslContext=ctx) vc_content = si.RetrieveContent() for s in pending: vm = self._find_vm(vc_content, s["server"]) if not vm: continue if has_prepatch_snap(vm): s["snap_done"] = True self._log_patch(f" ✅ Snapshot existant : {s['server']}", "ok") self.root.after(0, lambda sv=s: self._refresh_tree_item(sv)) else: self._log_patch(f" ⚠ Pas de snapshot : {s['server']}", "warn") Disconnect(si) except Exception as e: self._log_patch(f" Erreur vCenter {vc} : {e}", "ko") # Bilan ok = sum(1 for s in servers if s.get("snap_done")) ko = len(servers) - ok self._log_patch( f"Snapshots : {ok} trouvé(s), {ko} manquant(s) sur {len(servers)} serveur(s) avec accord", "ok" if ko == 0 else "warn") def _center_dialog(self, dlg, w, h): """Centre un dialog sur le meme ecran que la fenetre principale.""" center_window(dlg, w, h, parent=self.root) def _take_snapshots(self): selected = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"] if not selected: messagebox.showwarning("Attention", "Aucun serveur sélectionné (avec accord).") return if not VSPHERE_OK: messagebox.showwarning("pyVmomi manquant", "pyVmomi non installé.\n\n" "Installer avec :\npy -m pip install pyVmomi --proxy http://proxy.sanef.fr:8080\n\n" "Après installation, relancer l'application.") return # Prompt vSphere dlg = tk.Toplevel(self.root) dlg.title("Identifiants vSphere") dlg.configure(bg=self.BG) dlg.grab_set() dlg.resizable(False, False) tk.Label(dlg, text="Connexion vSphere", bg=self.BG, fg=self.ACCENT, font=("Consolas",12,"bold")).pack(pady=10) for lbl, attr, show, default_key in [ ("Utilisateur :", "vs_user", "", "vs_user"), ("Mot de passe :", "vs_pwd", "\u25cf", "") ]: f = tk.Frame(dlg, bg=self.BG); f.pack(fill="x", padx=20, pady=4) tk.Label(f, text=lbl, bg=self.BG, fg=self.FG, font=("Consolas",10), width=14).pack(side="left") var = tk.StringVar(value=self.settings.get(default_key, "")) setattr(self, attr, var) tk.Entry(f, textvariable=var, show=show, bg=self.BG2, fg=self.FG, font=("Consolas",10), insertbackground=self.FG).pack(side="left", fill="x", expand=True) center_window(dlg, 400, 200, parent=self.root) def do_snap(): user = self.vs_user.get().strip() pwd = self.vs_pwd.get().strip() dlg.destroy() threading.Thread(target=self._snap_worker, args=(selected, user, pwd), daemon=True).start() tk.Button(dlg, text="📸 Prendre snapshots", bg="#e64553", fg="white", font=("Consolas",11,"bold"), pady=6, command=do_snap).pack(pady=10) def _snap_worker(self, servers, vs_user, vs_pwd): snap_name = f"PrePatch_{date.today():%Y%m%d}" # Travailler sur les serveurs pas encore snapshotés pending = [s for s in servers if not s.get("snap_done")] for vc in VSPHERE_HOSTS: # Si tous les serveurs sont déjà snapshotés, inutile de continuer pending = [s for s in pending if not s.get("snap_done")] if not pending: break self._log_patch(f"Connexion vCenter : {vc}", "info") try: import ssl ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE si = SmartConnect(host=vc, user=vs_user, pwd=vs_pwd, sslContext=ctx) content = si.RetrieveContent() for s in pending: if s.get("snap_done"): continue # déjà fait sur un vCenter précédent vm = self._find_vm(content, s["server"]) if vm: try: task = vm.CreateSnapshot_Task( name=snap_name, description=f"Pre-patch automatique {date.today()}", memory=False, quiesce=False) while task.info.state not in ("success","error"): time.sleep(1) if task.info.state == "success": s["snap_done"] = True self._log_patch(f" ✅ Snapshot OK : {s['server']}", "ok") # Rafraîchir l'UI immédiatement (ne pas attendre la fin) self.root.after(0, lambda sv=s: self._refresh_tree_item(sv)) else: self._log_patch(f" ❌ Snapshot KO : {s['server']} ({task.info.error})", "ko") except Exception as e: self._log_patch(f" ❌ {s['server']} : {e}", "ko") # Ne pas logguer "VM non trouvée" ici — la VM peut être sur un autre vCenter Disconnect(si) except Exception as e: self._log_patch(f" Erreur vCenter {vc} : {e}", "ko") # Bilan final ok = [s for s in servers if s.get("snap_done")] ko = [s for s in servers if not s.get("snap_done") and not s.get("is_physical")] for s in ko: self._log_patch(f" ⚠ VM non trouvée sur aucun vCenter : {s['server']}", "warn") self._log_patch( f"Snapshots terminés : {len(ok)}/{len(servers)} OK", "ok" if not ko else "warn") # Rafraîchir l'arbre complet à la fin self.root.after(0, lambda: [self._refresh_tree_item(s) for s in servers]) def _find_vm(self, content, name): """Cherche une VM par nom exact ou partiel dans tout le vCenter""" name_short = name.split(".")[0].lower() # Sans le domaine def search_folder(folder): try: for obj in folder.childEntity: if hasattr(obj, "childEntity"): result = search_folder(obj) if result: return result if hasattr(obj, "name"): vm_name = obj.name.lower() if vm_name == name.lower() or vm_name == name_short: return obj if name_short in vm_name or vm_name in name_short: return obj except Exception: pass return None try: for dc in content.rootFolder.childEntity: result = search_folder(dc.vmFolder) if result: return result except Exception: pass return None # ========================================================================== # ONGLET 2 — LOGIQUE PATCH # ========================================================================== def _on_preset(self, event=None): val = PRESET_PACKAGES.get(self.preset_var.get(), "") if val == "CUSTOM": # Mode personnalise — activer les champs self.packages_entry.configure(state="normal") self.custom_excludes_entry.configure(state="normal") self.packages_var.set("") self.custom_excludes_var.set("") else: self.packages_var.set(val) self.custom_excludes_var.set("") self._preview_cmd() def _preview_cmd(self): """Genere la commande yum et l'affiche dans le champ editable""" preset = self.preset_var.get() packages = self.packages_var.get().strip() custom_excl = self.custom_excludes_var.get().strip() exclude_kernel = self.exclude_kernel_var.get() dryrun = self.dryrun_var.get() if preset == "Personnalise..." or custom_excl: # Mode personnalise if packages and not custom_excl: # Packages specifiques a patcher uniquement if dryrun: cmd = f"sudo yum check-update {packages} 2>/dev/null | head -30" else: cmd = f"sudo yum update -y {packages} 2>&1 | tail -25" elif custom_excl: # Excludes personnalises excl_parts = " ".join(f"--exclude={e.strip()}" for e in custom_excl.split() if e.strip()) if exclude_kernel and "--exclude=kernel" not in excl_parts.lower(): excl_parts += " --exclude=*kernel*" if packages: if dryrun: cmd = f"sudo yum check-update {packages} {excl_parts} 2>/dev/null | head -30" else: cmd = f"sudo yum update -y {packages} {excl_parts} 2>&1 | tail -25" else: if dryrun: cmd = f"sudo yum check-update {excl_parts} 2>/dev/null | head -30" else: cmd = f"sudo yum update -y {excl_parts} 2>&1 | tail -25" else: cmd = "sudo yum check-update 2>/dev/null | head -30" if dryrun else "sudo yum update -y 2>&1 | tail -25" else: # Mode preset standard — utilise build_yum_command cmd = build_yum_command("standard", packages, exclude_kernel, dryrun) cmd = "sudo " + cmd if not cmd.startswith("sudo") else cmd self.final_cmd_var.set(cmd) def _go_patch(self): # Bloquer si pas la semaine courante if not self.is_current_week: messagebox.showwarning( "Patch desactive", "Vous consultez une autre semaine que la semaine courante.\n\n" "Le patch n'est autorise que sur la semaine en cours.\n" "Utilisez le bouton ↺ pour revenir a la semaine courante.") return selected = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"] if not selected: messagebox.showwarning("Attention", "Aucun serveur sélectionné avec accord.") return # Vérifier snapshot pour les VMs no_snap = [s for s in selected if not s.get("snap_done") and not s.get("is_physical")] if no_snap: names = "\n".join(s["server"] for s in no_snap[:8]) ans = messagebox.askyesno( "⚠️ Snapshot manquant", f"{len(no_snap)} serveur(s) sans snapshot :\n{names}\n\n" "Continuer SANS snapshot ?", icon="warning") if not ans: return if not self.dryrun_var.get(): if not messagebox.askyesno( "Confirmation PATCH RÉEL", f"Lancer le PATCH RÉEL sur {len(selected)} serveur(s) ?\n\n" "Cette action est irréversible !", icon="warning"): return self.cyb_pwd = self.cybpwd_var.get().strip() or None self.running = True self.apply_all_cmd = False self.results = [] self.go_btn.configure(state="disabled") self.stop_btn.configure(state="normal") threading.Thread( target=self._patch_worker, args=(selected,), daemon=True).start() def _stop(self): self.running = False self._log_patch("⏹ Arrêt demandé...", "warn") def _patch_worker(self, servers): total = len(servers) packages = self.packages_var.get().strip() exclude_kernel = self.exclude_kernel_var.get() dryrun = self.dryrun_var.get() approved_cmd = None # commande approuvée pour "appliquer à tous" for idx, s in enumerate(servers): if not self.running: break server = s["server"] self._log_patch(f"\n[{idx+1}/{total}] ── {server} ──", "info") # Construire la commande yum yum_cmd = build_yum_command(s["domain"], packages, exclude_kernel, dryrun) # Confirmation commande (sauf si "appliquer à tous" déjà validé) if not self.apply_all_cmd: event = threading.Event() dialog_result = [None] def show_dialog(cmd=yum_cmd, srv=server, ev=event, res=dialog_result): dlg = YumConfirmDialog(self.root, srv, cmd) res[0] = dlg.result if dlg.result in ("apply","apply_all"): yum_cmd_final = dlg.final_cmd res.append(yum_cmd_final) ev.set() self.root.after(0, show_dialog) event.wait() action = dialog_result[0] if action == "cancel": self._log_patch(f" ↩ Serveur ignoré par l'utilisateur", "warn") s["status"] = "IGNORE" self._update_tree_status(s) continue elif action == "apply_all": self.apply_all_cmd = True if len(dialog_result) > 1: yum_cmd = dialog_result[1] approved_cmd = yum_cmd elif action == "apply": if len(dialog_result) > 1: yum_cmd = dialog_result[1] else: if approved_cmd: yum_cmd = approved_cmd self._log_patch(f" Connexion...", "info") t0 = time.time() client, eff_host = ssh_connect( server, s["env"], self.settings, self.pkey, self.pkey2, self.cyb_pwd) if client is None: dur = round(time.time() - t0, 1) self._log_patch(f" ❌ Connexion impossible ({dur}s)", "ko") s["status"] = "AUTH_KO" self.results.append({**s, "duration": f"{dur}s", "patch_status":"KO","patch_detail":"Connexion impossible"}) self._update_tree_status(s) self._update_progress(idx+1, total) continue self._log_patch(f" ✅ Connecté : {eff_host}", "ok") # Détection physique s["is_physical"] = detect_physical(client) if s["is_physical"]: self._log_patch(f" 🖥 Serveur physique détecté", "warn") # Pré-patching self._log_patch(f" Pre-patching...", "info") pre_out, _ = run_cmd(client, PRE_PATCH_SCRIPT, 30) if "PRE_PATCH_OK" in pre_out: self._log_patch(f" Pre-patch OK", "ok") else: self._log_patch(f" Pre-patch incomplet", "warn") # Flux Libre — snapshot pods Podman is_fl = "flux libre" in s.get("domain", "").lower() if is_fl: self._log_patch(f" Flux Libre : snapshot pods Podman...", "info") fl_out, _ = run_cmd(client, FL_PRE_PATCH_SCRIPT, 30) if "FL_PRE_OK" in fl_out: for line in fl_out.splitlines(): if line.startswith("FL_USER=") or line.startswith("FL_PODS_SAVED="): self._log_patch(f" {line}", "ok") elif "FL_NO_PODS" in fl_out: self._log_patch(f" Pas de pods Podman detectes", "info") else: self._log_patch(f" Snapshot pods incomplet", "warn") # Snapshot versions avant ver_before = {} if packages: for pkg in packages.split(): out, _ = run_cmd(client, f"rpm -q {pkg} 2>/dev/null || echo NOT_INSTALLED", 15) ver_before[pkg] = out.strip() # Exécution yum self._log_patch(f" 🔧 Commande : {yum_cmd[:80]}...", "cmd") yum_out, yum_err = run_cmd(client, yum_cmd, 600) if yum_out: for line in yum_out.splitlines()[-10:]: self._log_patch(f" {line}", "info") # Snapshot versions après ver_after = {} if packages: for pkg in packages.split(): out, _ = run_cmd(client, f"rpm -q {pkg} 2>/dev/null || echo NOT_INSTALLED", 15) ver_after[pkg] = out.strip() dur = round(time.time() - t0, 1) # Déterminer statut if dryrun: pkg_lines = [l for l in yum_out.splitlines() if l.split() and "." in l.split()[0] and not any(x in l for x in ["kB","bps","00:","Updat"])] if pkg_lines: s["patch_status"] = "UPDATE_AVAIL" s["patch_detail"] = " | ".join(pkg_lines)[:200] self._log_patch(f" ⚠ Updates disponibles : {len(pkg_lines)} package(s)", "warn") else: s["patch_status"] = "UP_TO_DATE" s["patch_detail"] = "Deja a jour" self._log_patch(f" ✅ A jour", "ok") else: updated = [p for p in (packages.split() if packages else []) if ver_before.get(p) != ver_after.get(p)] if updated or "Complete!" in yum_out: s["patch_status"] = "PATCHED" s["patch_detail"] = " | ".join( f"{p}: {ver_before.get(p,'?')} -> {ver_after.get(p,'?')}" for p in updated)[:200] if updated else "Packages mis a jour" self._log_patch(f" ✅ PATCHÉ : {s['patch_detail'][:80]}", "ok") elif "Nothing to do" in yum_out or "No packages" in yum_out: s["patch_status"] = "UP_TO_DATE" s["patch_detail"] = "Deja a jour" self._log_patch(f" ✅ Déjà à jour", "ok") else: s["patch_status"] = "UP_TO_DATE" s["patch_detail"] = "Deja a jour" # Vérifier reboot nécessaire reboot_out, _ = run_cmd(client, "needs-restarting -r 2>/dev/null; echo RC:$?", 15) if "RC:1" in reboot_out: s["reboot_required"] = True self._log_patch(f" ⚠ REBOOT REQUIS", "warn") else: s["reboot_required"] = False s["status"] = "OK" s["duration"] = f"{dur}s" client.close() self.results.append(deepcopy(s)) self._update_tree_status(s) self._update_progress(idx+1, total) # Fin self.running = False self.root.after(0, lambda: [ self.go_btn.configure(state="normal"), self.stop_btn.configure(state="disabled"), self._patch_done_summary() ]) def _update_tree_status(self, s): def _do(): if not self.srv_tree.exists(s["server"]): return vals = list(self.srv_tree.item(s["server"], "values")) vals[9] = s.get("status", "—") # statut est maintenant col 9 tag = ("ok" if s.get("status")=="OK" and s.get("patch_status") not in ("KO",) else "ko" if s.get("status")=="AUTH_KO" else "normal") self.srv_tree.item(s["server"], values=vals, tags=(tag,)) self.root.after(0, _do) def _update_progress(self, done, total): pct = (done/total)*100 self.root.after(0, lambda: [ self.prog_var.set(pct), self.prog_lbl.configure(text=f"{done}/{total} ({pct:.0f}%)")]) def _patch_done_summary(self): ok_c = sum(1 for r in self.results if r.get("status")=="OK") p_c = sum(1 for r in self.results if r.get("patch_status")=="PATCHED") rb_c = sum(1 for r in self.results if r.get("reboot_required")) ko_c = sum(1 for r in self.results if r.get("status")!="OK") msg = (f"Patch termine !\n\n" f"Total traite : {len(self.results)}\n" f"OK : {ok_c}\n" f"Patche : {p_c}\n" f"Reboot requis: {rb_c}\n" f"KO/Auth : {ko_c}") messagebox.showinfo("Patch termine", msg) if rb_c > 0: messagebox.showwarning("Reboot requis", f"{rb_c} serveur(s) necessitent un reboot.\n" "Utiliser l'onglet Post-patching > Reboot.") # Auto-générer le rapport dans l'onglet 4 self.root.after(500, self._generate_report) # Basculer sur l'onglet rapport self.root.after(600, lambda: self.nb.select(self.tab_report)) # ========================================================================== # ONGLET 3 — POST-PATCHING # ========================================================================== def _post_patch_check(self): targets = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"] if not targets: messagebox.showwarning("Attention", "Aucun serveur sélectionné.") return threading.Thread(target=self._post_worker, args=(targets,), daemon=True).start() def _post_worker(self, servers): for s in servers: self._log_post(f"\n── {s['server']} ──", "info") client, eff = ssh_connect(s["server"], s["env"], self.settings, self.pkey, self.pkey2, self.cyb_pwd) if not client: self._log_post(f" ❌ Connexion impossible", "ko") continue out, _ = run_cmd(client, POST_PATCH_SCRIPT, 60) for line in out.splitlines(): if "ALERTE" in line or "disparu" in line.lower(): self._log_post(f" {line}", "ko") elif "WARN" in line or "nouveau" in line.lower(): self._log_post(f" {line}", "warn") elif "OK" in line: self._log_post(f" {line}", "ok") else: self._log_post(f" {line}", "info") # Flux Libre — verifier et redemarrer les pods is_fl = "flux libre" in s.get("domain", "").lower() if is_fl: self._log_post(f" Flux Libre : verification pods Podman...", "info") fl_out, _ = run_cmd(client, FL_POST_PATCH_SCRIPT, 60) for line in fl_out.splitlines(): if "FL_RESTART=" in line: pod = line.split("=", 1)[1] self._log_post(f" Redemarrage pod: {pod}", "warn") elif "FL_ALL_OK" in line: self._log_post(f" Tous les pods sont actifs", "ok") elif "FL_RESTARTED=" in line: n = line.split("=", 1)[1] self._log_post(f" {n} pod(s) redemarres", "warn") elif "FL_USER=" in line: self._log_post(f" {line}", "info") elif "FL_NO_PODS" in line: self._log_post(f" Pas de pods Podman", "info") client.close() def _ask_mark_patched(self): """Demande confirmation puis marque les serveurs patchés en vert dans Excel""" patched = [s for s in self.servers if s.get("patch_status") == "PATCHED" and s.get("selected")] if not patched: patched = [r for r in self.results if r.get("patch_status") == "PATCHED"] if not patched: messagebox.showwarning("Attention", "Aucun serveur patche detecte.\n" "Selectionner manuellement les serveurs a marquer.") patched = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"] if not patched: return names = "\n".join(s["server"] for s in patched[:10]) if len(patched) > 10: names += f"\n... et {len(patched)-10} autres" if messagebox.askyesno( "Marquer comme patches dans Excel", f"Marquer {len(patched)} serveur(s) en VERT dans le fichier Excel ?\n\n{names}\n\nCette action modifie le fichier Excel original.", icon="question"): self._mark_patched_excel(patched) def _delete_old_snapshots(self): """Supprime les snapshots de moins de 3 jours sur les serveurs sélectionnés""" if not VSPHERE_OK: messagebox.showwarning("pyVmomi manquant", "pyVmomi non installe.\npy -m pip install pyVmomi --proxy http://proxy.sanef.fr:8080") return selected = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"] if not selected: messagebox.showwarning("Attention", "Aucun serveur selectionne.") return # Prompt vSphere dlg = tk.Toplevel(self.root) dlg.title("Supprimer snapshots vSphere") dlg.geometry("400x220") dlg.configure(bg=self.BG) dlg.grab_set() tk.Label(dlg, text="Supprimer snapshots > 3 jours", bg=self.BG, fg=self.ACCENT, font=("Consolas",12,"bold")).pack(pady=10) for lbl, attr, show in [("Utilisateur :", "vsd_user", ""), ("Mot de passe :", "vsd_pwd", "●")]: f = tk.Frame(dlg, bg=self.BG); f.pack(fill="x", padx=20, pady=4) tk.Label(f, text=lbl, bg=self.BG, fg=self.FG, font=("Consolas",10), width=14).pack(side="left") var = tk.StringVar(); setattr(self, attr, var) tk.Entry(f, textvariable=var, show=show, bg=self.BG2, fg=self.FG, font=("Consolas",10), insertbackground=self.FG).pack(side="left", fill="x", expand=True) days_var = tk.IntVar(value=3) fd = tk.Frame(dlg, bg=self.BG); fd.pack(fill="x", padx=20, pady=4) tk.Label(fd, text="Supprimer snaps < (jours) :", bg=self.BG, fg=self.FG, font=("Consolas",10), width=24).pack(side="left") tk.Spinbox(fd, from_=1, to=30, textvariable=days_var, width=5, bg=self.BG2, fg=self.FG).pack(side="left") def do_delete(): user = self.vsd_user.get().strip() pwd = self.vsd_pwd.get().strip() days = days_var.get() dlg.destroy() threading.Thread(target=self._snap_delete_worker, args=(selected, user, pwd, days), daemon=True).start() tk.Button(dlg, text="🗑 Supprimer", bg="#e64553", fg="white", font=("Consolas",11,"bold"), pady=6, command=do_delete).pack(pady=10) def _snap_delete_worker(self, servers, vs_user, vs_pwd, max_days): """Supprime les snapshots PrePatch vieux de PLUS de max_days jours (garde les récents)""" from datetime import timezone cutoff = datetime.now(tz=timezone.utc) - __import__("datetime").timedelta(days=max_days) connected_vcs = [] for vc in VSPHERE_HOSTS: try: import ssl ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE si = SmartConnect(host=vc, user=vs_user, pwd=vs_pwd, sslContext=ctx, connectionPoolTimeout=10) vc_content = si.RetrieveContent() self._log_post(f"Connecte vCenter : {vc}", "ok") connected_vcs.append((vc, si, vc_content)) except Exception as e: self._log_post(f"Inaccessible : {vc} — {str(e)[:50]}", "ko") if not connected_vcs: self._log_post("Aucun vCenter joignable", "ko") return for s in servers: for vc, si, vc_content in connected_vcs: vm = self._find_vm(vc_content, s["server"]) if not vm: continue self._log_post(f"\n VM : {vm.name} sur {vc}", "info") def get_snapshots(snap_list): snaps = [] for snap in snap_list: snaps.append(snap) snaps.extend(get_snapshots(snap.childSnapshotList)) return snaps if not vm.snapshot: self._log_post(f" Aucun snapshot", "info") continue all_snaps = get_snapshots(vm.snapshot.rootSnapshotList) for snap in all_snaps: snap_time = snap.createTime if snap_time.tzinfo is None: snap_time = snap_time.replace(tzinfo=timezone.utc) age_days = (datetime.now(tz=timezone.utc) - snap_time).days if age_days > max_days: # Vieux de plus de max_days jours → supprimer self._log_post( f" Snapshot : {snap.name} ({age_days}j) — SUPPRESSION (>{max_days}j)", "warn") try: task = snap.snapshot.RemoveSnapshot_Task(removeChildren=False) while task.info.state not in ("success","error"): time.sleep(1) if task.info.state == "success": self._log_post(f" OK supprime : {snap.name}", "ok") else: self._log_post(f" ERREUR suppression : {snap.name}", "ko") except Exception as e: self._log_post(f" ERREUR : {e}", "ko") else: # Récent (≤ max_days jours) → conserver self._log_post( f" Snapshot : {snap.name} ({age_days}j) — conservé (<= {max_days}j)", "info") break for vc, si, _ in connected_vcs: try: Disconnect(si) except: pass def _remove_old_kernels(self): targets = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"] if not targets: messagebox.showwarning("Attention", "Aucun serveur sélectionné.") return if not messagebox.askyesno("Confirmation", f"Supprimer les anciens kernels sur {len(targets)} serveur(s) ?"): return def worker(): for s in targets: self._log_post(f"\n── {s['server']} — Suppression anciens kernels ──", "info") client, _ = ssh_connect(s["server"], s["env"], self.settings, self.pkey, self.pkey2, self.cyb_pwd) if not client: self._log_post(f" ❌ Connexion impossible", "ko") continue out, err = run_cmd(client, "sudo package-cleanup --oldkernels --count=1 -y 2>&1 | tail -10", 60) for line in out.splitlines(): self._log_post(f" {line}", "ok" if "Removed" in line else "info") client.close() threading.Thread(target=worker, daemon=True).start() def _undo_yum(self): self.audit("UNDO_YUM", "") target = None for s in self.servers: if s.get("selected") and s["accord"]=="oui": target = s break if not target: messagebox.showwarning("Attention", "Sélectionner au moins un serveur.") return self._log_post(f"\n── {target['server']} — Historique yum ──", "info") def worker(): client, _ = ssh_connect(target["server"], target["env"], self.settings, self.pkey, self.pkey2, self.cyb_pwd) if not client: self._log_post(" ❌ Connexion impossible", "ko") return out, _ = run_cmd(client, "sudo yum history list 2>/dev/null | head -20", 20) client.close() self._log_post(out, "info") # Demander l'ID à annuler def ask(): tid = simpledialog.askstring("Undo yum", f"Historique yum sur {target['server']} :\n\n{out}\n\nID de transaction à annuler :") if tid: threading.Thread(target=do_undo, args=(tid,), daemon=True).start() def do_undo(tid): self._log_post(f" Undo transaction {tid}...", "warn") c2, _ = ssh_connect(target["server"], target["env"], self.settings, self.pkey, self.pkey2, self.cyb_pwd) if not c2: self._log_post(" ❌ Connexion impossible", "ko") return out2, _ = run_cmd(c2, f"sudo yum history undo {tid} -y 2>&1 | tail -15", 120) for line in out2.splitlines(): self._log_post(f" {line}", "ok" if "Complete" in line else "info") c2.close() self.root.after(0, ask) threading.Thread(target=worker, daemon=True).start() def _reboot_servers(self): self.audit("REBOOT_START", "") reboot_list = [s for s in self.servers if s.get("selected") and s.get("reboot_required") and s["accord"]=="oui"] if not reboot_list: reboot_list = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"] if not reboot_list: messagebox.showwarning("Attention", "Aucun serveur sélectionné.") return names = "\n".join(s["server"] for s in reboot_list[:10]) if not messagebox.askyesno("⚠️ Confirmation reboot", f"Redémarrer {len(reboot_list)} serveur(s) ?\n\n{names}", icon="warning"): return def worker(): for s in reboot_list: self._log_post(f"\n── {s['server']} — Reboot ──", "warn") client, _ = ssh_connect(s["server"], s["env"], self.settings, self.pkey, self.pkey2, self.cyb_pwd) if not client: self._log_post(" ❌ Connexion impossible", "ko"); continue run_cmd(client, "sudo shutdown -r +1 'Reboot post-patching' &", 10) client.close() self._log_post(f" ✅ Reboot planifié dans 1 min", "ok") threading.Thread(target=worker, daemon=True).start() # ========================================================================== # ONGLET 4 — RAPPORT # ========================================================================== def _generate_report(self): """Génère le rapport visuel — KPI widgets + tableau détaillé + log brut""" if not self.results: messagebox.showwarning("Attention", "Aucun résultat de patch disponible.") return now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") week = date.today().isocalendar()[1] # ── Calcul des KPI ──────────────────────────────────────────────────── total = len(self.results) ok_c = sum(1 for r in self.results if r.get("status")=="OK") p_c = sum(1 for r in self.results if r.get("patch_status")=="PATCHED") uptodate = sum(1 for r in self.results if r.get("patch_status") in ("UP_TO_DATE","UP_TO_DATE")) rb_c = sum(1 for r in self.results if r.get("reboot_required")) ko_c = sum(1 for r in self.results if r.get("status")!="OK") # Mettre à jour les widgets KPI self.kpi_widgets["total" ].configure(text=str(total)) self.kpi_widgets["ok" ].configure(text=str(ok_c)) self.kpi_widgets["patched" ].configure(text=str(p_c)) self.kpi_widgets["uptodate"].configure(text=str(uptodate)) self.kpi_widgets["reboot" ].configure(text=str(rb_c)) self.kpi_widgets["ko" ].configure(text=str(ko_c)) # ── Peupler le tableau détaillé ─────────────────────────────────────── for item in self.report_tree.get_children(): self.report_tree.delete(item) for r in self.results: accord = "OUI" if r.get("accord")=="oui" else "NON" snap = "OK" if r.get("snap_done") else ("PHY" if r.get("is_physical") else "—") conn = r.get("status","?") ps = r.get("patch_status","?") rb = "OUI" if r.get("reboot_required") else "—" detail = r.get("patch_detail","")[:60] # Choisir le tag couleur if r.get("status") != "OK": tag = "ko" elif r.get("patch_status") == "PATCHED": tag = "patched" elif r.get("reboot_required"): tag = "reboot" elif r.get("patch_status") in ("UP_TO_DATE",): tag = "uptodate" else: tag = "ok" self.report_tree.insert("", "end", values=( r.get("server",""), r.get("env",""), r.get("domain",""), accord, snap, conn, ps, rb, detail, ), tags=(tag,)) # ── Log brut ────────────────────────────────────────────────────────── self.report_txt.configure(state="normal") self.report_txt.delete("1.0", "end") def w(txt, tag="info"): self.report_txt.insert("end", txt+"\n", tag) w(f"{'='*70}", "title") w(f" RAPPORT PATCHING — Semaine {week} — {now}", "title") w(f" Fichier : {self.excel_var.get()}", "info") w(f" Intervenant : {self.settings.get('patcher','—')}", "info") w(f"{'='*70}", "title") w("") w(f" RESUME : {total} traites | {ok_c} OK | {p_c} patches | {rb_c} reboot | {ko_c} KO", "info") w("") w(f" {'SERVEUR':<35} {'STATUT':<12} {'PATCH':<16} {'REBOOT'}", "title") w(f" {'-'*66}", "info") for r in self.results: status = r.get("status","?") ps = r.get("patch_status","?") rb = "REBOOT" if r.get("reboot_required") else "" tag = "ok" if status=="OK" else "ko" if r.get("patch_status") == "PATCHED": tag = "ok" if r.get("reboot_required"): tag = "warn" w(f" {r.get('server',''):<35} {status:<12} {ps:<16} {rb}", tag) if r.get("patch_detail"): w(f" → {r['patch_detail'][:80]}", "info") w("") w(f"{'='*70}", "title") self.report_txt.configure(state="disabled") def _mark_patched_excel(self, servers_to_mark): """Met le fond vert sur les lignes patchees dans le fichier Excel. Utilise une copie temporaire pour eviter les conflits OneDrive/verrou.""" import shutil import tempfile filepath = self.excel_var.get().strip() sheet = self.sheet_var.get() if not filepath or not os.path.exists(filepath): messagebox.showerror("Erreur Excel", "Fichier Excel introuvable") return try: from openpyxl import load_workbook from openpyxl.styles import PatternFill # Copie temporaire pour eviter le verrou OneDrive tmp_dir = tempfile.mkdtemp() tmp_path = os.path.join(tmp_dir, os.path.basename(filepath)) shutil.copy2(filepath, tmp_path) wb = load_workbook(tmp_path) ws = wb[sheet] green_fill = PatternFill(start_color="00B050", end_color="00B050", fill_type="solid") marked = 0 for s in servers_to_mark: row_num = s.get("excel_row") if not row_num: for row in ws.iter_rows(): if row[0].value == s["server"]: row_num = row[0].row break if row_num: for col in range(1, ws.max_column + 1): ws.cell(row=row_num, column=col).fill = green_fill marked += 1 # Sauvegarder dans le temp wb.save(tmp_path) wb.close() # Tenter de recopier vers l'original try: shutil.copy2(tmp_path, filepath) self._log_patch(f" {marked} ligne(s) marquee(s) en vert dans Excel", "ok") messagebox.showinfo("Excel mis a jour", f"{marked} serveur(s) marques comme patches (fond vert) dans :\n{filepath}") except PermissionError: # OneDrive verrouille le fichier — sauvegarder a cote backup_path = filepath.replace(".xlsx", f"_patched_{datetime.now():%Y%m%d_%H%M}.xlsx") shutil.copy2(tmp_path, backup_path) self._log_patch(f" Excel verrouille, sauvegarde dans : {backup_path}", "warn") messagebox.showwarning("Excel verrouille", f"Le fichier Excel est verrouille (OneDrive).\n\n" f"Une copie a ete sauvegardee :\n{backup_path}\n\n" f"Fermez le fichier original puis copiez la version patchee.") # Nettoyage temp try: os.remove(tmp_path) os.rmdir(tmp_dir) except Exception: pass self.audit("MARK_EXCEL", f"{marked} serveurs marques, sheet={sheet}") except Exception as e: messagebox.showerror("Erreur Excel", f"Impossible de modifier l'Excel :\n{e}") def _export_csv(self): if not self.results: messagebox.showwarning("Attention","Aucun résultat.") return rep_dir = os.path.join(os.path.expanduser("~"),"Documents","patch_reports") os.makedirs(rep_dir, exist_ok=True) csv_file = os.path.join(rep_dir, f"patch_S{date.today().isocalendar()[1]}_{datetime.now():%Y%m%d_%H%M%S}.csv") fields = ["server","env","domain","app","accord","status","patch_status", "patch_detail","reboot_required","snap_done","is_physical"] with open(csv_file,"w",newline="",encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore") w.writeheader(); w.writerows(self.results) messagebox.showinfo("Export CSV", f"Fichier : {csv_file}") def _export_dokuwiki(self): if not self.results: messagebox.showwarning("Attention","Aucun résultat.") return rep_dir = os.path.join(os.path.expanduser("~"),"Documents","patch_reports") os.makedirs(rep_dir, exist_ok=True) wiki_file = os.path.join(rep_dir, f"patch_wiki_S{date.today().isocalendar()[1]}_{datetime.now():%Y%m%d}.txt") week = date.today().isocalendar()[1] with open(wiki_file,"w",encoding="utf-8") as f: f.write(f"====== Rapport Patching — Semaine {week} ======\n\n") f.write(f"//Généré le {datetime.now():%d/%m/%Y %H:%M} — MYPCZEN / SANEF DSI-SOC//\n\n") f.write("===== Résumé =====\n\n") f.write(f"^ Serveurs traités ^ Patchés ^ Reboot requis ^ KO ^\n") ok_c = sum(1 for r in self.results if r.get("status")=="OK") p_c = sum(1 for r in self.results if r.get("patch_status")=="PATCHED") rb_c = sum(1 for r in self.results if r.get("reboot_required")) ko_c = sum(1 for r in self.results if r.get("status")!="OK") f.write(f"| {len(self.results)} | {p_c} | {rb_c} | {ko_c} |\n\n") f.write("===== Détail =====\n\n") f.write("^ Serveur ^ Env ^ Statut ^ Patch ^ Reboot ^ Accord ^ Snapshot ^\n") for r in self.results: rb = "⚠ OUI" if r.get("reboot_required") else "NON" f.write(f"| {r['server']} | {r['env']} | {r.get('status','?')} | " f"{r.get('patch_status','?')} | {rb} | " f"{'OUI' if r.get('accord')=='oui' else 'NON'} | " f"{'OUI' if r.get('snap_done') else 'NON'} |\n") messagebox.showinfo("Export DokuWiki", f"Fichier : {wiki_file}") # ========================================================================== # ONGLET 5 — AUDIT # ========================================================================== def _build_tab_audit(self): tab = self.tab_audit BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN hf = tk.Frame(tab, bg="#181825", pady=6) hf.pack(fill="x") tk.Label(hf, text="Journal d'audit", bg="#181825", fg=ACCENT, font=("Consolas", 13, "bold")).pack(side="left", padx=12) tk.Button(hf, text="Rafraichir", bg=BTN, fg=FG, font=("Consolas", 9), command=self._refresh_audit).pack(side="right", padx=12) tk.Button(hf, text="Exporter CSV", bg=BTN, fg=FG, font=("Consolas", 9), command=self._export_audit_csv).pack(side="right", padx=4) tk.Button(hf, text="Changer mon mot de passe", bg="#fab387", fg="#1e1e2e", font=("Consolas", 9), command=self._change_my_password).pack(side="right", padx=4) cols = ("timestamp", "user", "action", "details") self.audit_tree = ttk.Treeview(tab, columns=cols, show="headings") self.audit_tree.heading("timestamp", text="Date/Heure") self.audit_tree.heading("user", text="Utilisateur") self.audit_tree.heading("action", text="Action") self.audit_tree.heading("details", text="Details") self.audit_tree.column("timestamp", width=160) self.audit_tree.column("user", width=120) self.audit_tree.column("action", width=150) self.audit_tree.column("details", width=500) sb = ttk.Scrollbar(tab, orient="vertical", command=self.audit_tree.yview) self.audit_tree.configure(yscrollcommand=sb.set) self.audit_tree.pack(side="left", fill="both", expand=True, padx=6, pady=4) sb.pack(side="right", fill="y", pady=4) self._refresh_audit() def _refresh_audit(self): self.audit_tree.delete(*self.audit_tree.get_children()) for log in self.db.get_logs(500): self.audit_tree.insert("", "end", values=( log["timestamp"], log["username"], log["action"], log.get("details", ""))) def _export_audit_csv(self): logs = self.db.get_logs(5000) if not logs: messagebox.showinfo("Info", "Aucun log"); return p = filedialog.asksaveasfilename(defaultextension=".csv", filetypes=[("CSV", "*.csv")], initialfile=f"audit_patch_{datetime.now():%Y%m%d_%H%M%S}.csv") if not p: return with open(p, "w", newline="", encoding="utf-8-sig") as f: w = csv.writer(f, delimiter=";") w.writerow(["Timestamp", "Utilisateur", "Action", "Details"]) for l in logs: w.writerow([l["timestamp"], l["username"], l["action"], l.get("details", "")]) self.audit("EXPORT_AUDIT", f"{len(logs)} entries -> {p}") def _change_my_password(self): dlg = ChangePasswordDialog(self.root, self.db, self.current_user["username"]) # Centrer sur le parent if dlg.success: messagebox.showinfo("OK", "Mot de passe modifie avec succes") # ========================================================================== # ONGLET 6 — UTILISATEURS (admin only) # ========================================================================== def _build_tab_users(self): tab = self.tab_users BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN # Creation user cf = tk.Frame(tab, bg="#181825", pady=6) cf.pack(fill="x") tk.Label(cf, text="Gestion des utilisateurs", bg="#181825", fg=ACCENT, font=("Consolas", 13, "bold")).pack(side="left", padx=12) af = tk.Frame(tab, bg=BG, pady=6) af.pack(fill="x", padx=12) tk.Label(af, text="Utilisateur :", bg=BG, fg=FG, font=("Consolas", 10)).pack(side="left", padx=(0, 4)) self.new_user_var = tk.StringVar() tk.Entry(af, textvariable=self.new_user_var, bg=BG2, fg=FG, font=("Consolas", 10), width=15, insertbackground=FG).pack(side="left", padx=4) tk.Label(af, text="MDP :", bg=BG, fg=FG, font=("Consolas", 10)).pack(side="left", padx=(8, 4)) self.new_pwd_var = tk.StringVar() tk.Entry(af, textvariable=self.new_pwd_var, bg=BG2, fg=FG, font=("Consolas", 10), width=15, show="*", insertbackground=FG).pack(side="left", padx=4) tk.Label(af, text="Role :", bg=BG, fg=FG, font=("Consolas", 10)).pack(side="left", padx=(8, 4)) self.new_role_var = tk.StringVar(value="operator") tk.OptionMenu(af, self.new_role_var, "admin", "operator", "viewer").pack(side="left", padx=4) tk.Button(af, text="Creer", bg="#40a02b", fg="white", font=("Consolas", 9, "bold"), command=self._create_user).pack(side="left", padx=8) # Liste users cols = ("username", "role", "locked", "must_change", "last_login") self.users_tree = ttk.Treeview(tab, columns=cols, show="headings") self.users_tree.heading("username", text="Utilisateur") self.users_tree.heading("role", text="Role") self.users_tree.heading("locked", text="Verrouille") self.users_tree.heading("must_change", text="Chg. MDP") self.users_tree.heading("last_login", text="Derniere connexion") self.users_tree.column("username", width=150) self.users_tree.column("role", width=100) self.users_tree.column("locked", width=100) self.users_tree.column("must_change", width=100) self.users_tree.column("last_login", width=180) self.users_tree.pack(fill="both", expand=True, padx=12, pady=6) bf = tk.Frame(tab, bg=BG, pady=6) bf.pack(fill="x", padx=12) tk.Button(bf, text="Supprimer", bg="#f38ba8", fg="#1e1e2e", font=("Consolas", 9, "bold"), command=self._delete_user).pack(side="left", padx=4) tk.Button(bf, text="Deverrouiller", bg="#fab387", fg="#1e1e2e", font=("Consolas", 9, "bold"), command=self._unlock_user).pack(side="left", padx=4) tk.Button(bf, text="Reset MDP", bg=BTN, fg=FG, font=("Consolas", 9), command=self._reset_user_pwd).pack(side="left", padx=4) tk.Button(bf, text="Rafraichir", bg=BTN, fg=FG, font=("Consolas", 9), command=self._refresh_users).pack(side="right", padx=4) self._refresh_users() def _refresh_users(self): self.users_tree.delete(*self.users_tree.get_children()) for u in self.db.list_users(): self.users_tree.insert("", "end", values=( u["username"], u["role"], "OUI" if u["locked"] else "", "OUI" if u["must_change_pwd"] else "", u["last_login"] or "")) def _create_user(self): un = self.new_user_var.get().strip() pw = self.new_pwd_var.get().strip() role = self.new_role_var.get() if not un or not pw: messagebox.showwarning("Erreur", "Saisir un nom et un mot de passe"); return if self.db.create_user(un, pw, role): self.audit("CREATE_USER", f"{un} (role={role})") self.new_user_var.set(""); self.new_pwd_var.set("") self._refresh_users() else: messagebox.showerror("Erreur", f"L'utilisateur '{un}' existe deja") def _delete_user(self): sel = self.users_tree.selection() if not sel: return un = self.users_tree.item(sel[0])["values"][0] if un == "admin": messagebox.showwarning("Erreur", "Impossible de supprimer admin"); return if messagebox.askyesno("Confirmer", f"Supprimer '{un}' ?"): self.db.delete_user(un) self.audit("DELETE_USER", un) self._refresh_users() def _unlock_user(self): sel = self.users_tree.selection() if not sel: return un = self.users_tree.item(sel[0])["values"][0] self.db.unlock_user(un) self.audit("UNLOCK_USER", un) self._refresh_users() def _reset_user_pwd(self): sel = self.users_tree.selection() if not sel: return un = self.users_tree.item(sel[0])["values"][0] new_pwd = simpledialog.askstring("Reset", f"Nouveau mot de passe pour {un} :", show="*") if new_pwd: self.db.reset_password(un, new_pwd) self.audit("RESET_PASSWORD", un) self._refresh_users() # ============================================================================== # MAIN # ============================================================================== if __name__ == "__main__": db = Database() root = tk.Tk() root.withdraw() # Cacher pendant le login login = LoginDialog(root, db) if not login.result: root.destroy() sys.exit(0) user = login.result # Forcer changement de mot de passe au premier login if user.get("must_change_pwd"): dlg = ChangePasswordDialog(root, db, user["username"], forced=True) if not dlg.success: root.destroy() sys.exit(0) root.deiconify() app = PatchManagerV2(root, db, user) root.mainloop()