patchmanagerexev2ok/patch_manager_v2_backup_before_theme.py

3562 lines
161 KiB
Python

#!/usr/bin/env python3
# ==============================================================================
# patch_manager_v2.py — SANEF Linux Patch Manager GUI v2
# Gestion complète du patching Linux via Excel + SSH + vSphere
#
# Prérequis : py -m pip install paramiko openpyxl requests pyVmomi --proxy http://proxy.sanef.fr:8080
# Usage : py patch_manager_v2.py
# Auteur : MYPCZEN / SANEF DSI - SOC
# ==============================================================================
import tkinter as tk
from tkinter import ttk, filedialog, scrolledtext, messagebox, simpledialog
import threading
import paramiko
import socket
import time
import csv
import os
import sys
import json
from datetime import datetime, date
from copy import deepcopy
import sqlite3
import hashlib
import secrets
import ctypes
# openpyxl optionnel — installé si absent
try:
import openpyxl
except ImportError:
os.system("py -m pip install openpyxl --proxy http://proxy.sanef.fr:8080 -q")
import openpyxl
# pyVmomi optionnel — pour vSphere
try:
from pyVim.connect import SmartConnect, Disconnect
from pyVmomi import vim
VSPHERE_OK = True
except ImportError:
VSPHERE_OK = False
# ==============================================================================
# CONSTANTES
# ==============================================================================
VERSION = "2.0"
VSPHERE_HOSTS = [
"vpgesavcs1.sanef.groupe",
"vpmetavcs1.sanef.groupe",
"vpsicavcs1.sanef.groupe",
]
YUM_EXCLUDES_STD = (
"--exclude=*mongodb* --exclude=*mysql* --exclude=*postgres* "
"--exclude=*mariadb* --exclude=*oracle* --exclude=*pgdg* --exclude=*php* "
"--exclude=*java* --exclude=*redis* --exclude=*elasticsearch* --exclude=*nginx* "
"--exclude=*mod_ssl* --exclude=*haproxy* --exclude=*certbot* "
"--exclude=*python-certbot* --exclude=*docker* --exclude=*podman* "
"--exclude=*centreon* --exclude=*qwserver* "
"--exclude=*ansible* --exclude=*node* --exclude=*tina* --exclude=*memcached* "
"--exclude=*nextcloud* --exclude=*pgbouncer* --exclude=*pgpool* "
"--exclude=*pgbadger* --exclude=*psycopg2* --exclude=*barman* --exclude=*kibana* "
"--exclude=*sdcss*" # Symantec DCS agent + sdcss-kmod kernel module
)
YUM_EXCLUDES_FLUX_LIBRE = "--exclude=*podman*"
PRE_PATCH_SCRIPT = r"""
cat > /tmp/secops_pre_patching.sh << 'EOF'
HOSTNAME=$(hostname)
SNAPSHOT_DIR="/tmp"
systemctl list-units --type=service --state=running --no-pager \
| awk '{print $1}' | grep '\.service$' \
> ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt
echo "Services sauvegardes : $(wc -l < ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt)"
ss -tlnup > ${SNAPSHOT_DIR}/secops_ports_avant_${HOSTNAME}.txt
ss -tlnup | awk 'NR>1 && $7 != "" {
match($7, /users:\(\("([^"]+)"/, arr)
split($5, addr, ":")
port = addr[length(addr)]
if (arr[1] != "" && port+0 < 32768) print port, arr[1]
}' | sort -u > ${SNAPSHOT_DIR}/secops_ports_detail_avant_${HOSTNAME}.txt
echo "Ports sauvegardes : $(grep -c LISTEN ${SNAPSHOT_DIR}/secops_ports_avant_${HOSTNAME}.txt)"
echo "PRE_PATCH_OK"
EOF
bash /tmp/secops_pre_patching.sh
"""
POST_PATCH_SCRIPT = r"""
cat > /tmp/secops_post_patching.sh << 'EOF'
HOSTNAME=$(hostname)
SNAPSHOT_DIR="/tmp"
GREEN='\033[0;32m'; RED='\033[0;31m'; YELLOW='\033[0;33m'; NC='\033[0m'
RAPPORT="/tmp/rapport_patching_${HOSTNAME}_$(date +%Y%m%d_%H%M).txt"
systemctl list-units --type=service --state=running --no-pager \
| awk '{print $1}' | grep '\.service$' \
> ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt
DISPARUS_SVC=$(comm -23 \
<(sort ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt) \
<(sort ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt) \
| grep -v "user@")
APPARUS_SVC=$(comm -13 \
<(sort ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt) \
<(sort ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt) \
| grep -v "setroubleshootd\|user@")
echo "--- SERVICES ---" | tee -a ${RAPPORT}
echo "Avant: $(wc -l < ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt) | Apres: $(wc -l < ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt)" | tee -a ${RAPPORT}
[ -z "$DISPARUS_SVC" ] && echo "OK - Aucun service disparu" | tee -a ${RAPPORT} || { echo "ALERTE services disparus:" | tee -a ${RAPPORT}; echo "$DISPARUS_SVC" | tee -a ${RAPPORT}; }
[ -z "$APPARUS_SVC" ] && echo "OK - Aucun nouveau service" | tee -a ${RAPPORT} || { echo "WARN nouveaux services:" | tee -a ${RAPPORT}; echo "$APPARUS_SVC" | tee -a ${RAPPORT}; }
ss -tlnup > ${SNAPSHOT_DIR}/secops_ports_apres_${HOSTNAME}.txt
ss -tlnup | awk 'NR>1 && $7 != "" {
match($7, /users:\(\("([^"]+)"/, arr)
split($5, addr, ":")
port = addr[length(addr)]
if (arr[1] != "" && port+0 < 32768) print port, arr[1]
}' | sort -u > ${SNAPSHOT_DIR}/secops_ports_detail_apres_${HOSTNAME}.txt
PORTS_DISPARUS=$(comm -23 \
<(sort ${SNAPSHOT_DIR}/secops_ports_detail_avant_${HOSTNAME}.txt) \
<(sort ${SNAPSHOT_DIR}/secops_ports_detail_apres_${HOSTNAME}.txt))
PORTS_APPARUS=$(comm -13 \
<(sort ${SNAPSHOT_DIR}/secops_ports_detail_avant_${HOSTNAME}.txt) \
<(sort ${SNAPSHOT_DIR}/secops_ports_detail_apres_${HOSTNAME}.txt))
echo "--- PORTS ---" | tee -a ${RAPPORT}
echo "Avant: $(grep -c LISTEN ${SNAPSHOT_DIR}/secops_ports_avant_${HOSTNAME}.txt) | Apres: $(grep -c LISTEN ${SNAPSHOT_DIR}/secops_ports_apres_${HOSTNAME}.txt)" | tee -a ${RAPPORT}
[ -z "$PORTS_DISPARUS" ] && echo "OK - Aucun port disparu" | tee -a ${RAPPORT} || { echo "ALERTE ports disparus:" | tee -a ${RAPPORT}; echo "$PORTS_DISPARUS" | tee -a ${RAPPORT}; }
[ -z "$PORTS_APPARUS" ] && echo "OK - Aucun nouveau port" | tee -a ${RAPPORT} || { echo "WARN nouveaux ports:" | tee -a ${RAPPORT}; echo "$PORTS_APPARUS" | tee -a ${RAPPORT}; }
echo "" | tee -a ${RAPPORT}
echo "RAPPORT: ${RAPPORT}"
echo "POST_PATCH_OK"
EOF
bash /tmp/secops_post_patching.sh
"""
FL_PRE_PATCH_SCRIPT = r"""
cat > /tmp/secops_fl_pre_patch.sh << 'FLEOF'
# Detecter l'utilisateur applicatif podman
APP_USER=$(ps aux | grep -E "conmon|podman" | grep -v grep | awk '{print $1}' | sort -u | head -1)
if [ -z "$APP_USER" ]; then
echo "FL_NO_PODS"
exit 0
fi
echo "FL_USER=$APP_USER"
# Snapshot pods running via sudo su
sudo su - $APP_USER -c 'XDG_RUNTIME_DIR=/run/user/$(id -u) podman ps --format "{{.Names}}" --filter "status=running"' \
> /tmp/fl_pods_avant_$(hostname)_$(date +%Y%m%d_%H%M).txt
echo "FL_PODS_SAVED=$(wc -l < /tmp/fl_pods_avant_$(hostname)_*.txt | tail -1)"
# Status boo_manage si disponible
if command -v boo_manage &>/dev/null || sudo su - $APP_USER -c "which boo_manage" &>/dev/null; then
sudo su - $APP_USER -c "boo_manage -a -c status" 2>/dev/null || true
fi
echo "FL_PRE_OK"
FLEOF
bash /tmp/secops_fl_pre_patch.sh
"""
FL_POST_PATCH_SCRIPT = r"""
cat > /tmp/secops_fl_post_patch.sh << 'FLEOF'
APP_USER=$(ps aux | grep -E "conmon|podman" | grep -v grep | awk '{print $1}' | sort -u | head -1)
if [ -z "$APP_USER" ]; then
echo "FL_NO_PODS"
exit 0
fi
echo "FL_USER=$APP_USER"
SNAPSHOT=$(ls -t /tmp/fl_pods_avant_$(hostname)_*.txt 2>/dev/null | head -1)
if [ -z "$SNAPSHOT" ]; then
echo "FL_NO_SNAPSHOT"
sudo su - $APP_USER -c "boo_manage -a -c start" 2>/dev/null || true
exit 0
fi
AVANT=$(cat $SNAPSHOT)
APRES=$(sudo su - $APP_USER -c 'XDG_RUNTIME_DIR=/run/user/$(id -u) podman ps --format "{{.Names}}"')
ARRETES=0
while IFS= read -r pod; do
if ! echo "$APRES" | grep -q "^${pod}$"; then
echo "FL_RESTART=$pod"
sudo su - $APP_USER -c "boo_manage -p ${pod} -c start" 2>/dev/null || \
sudo su - $APP_USER -c "XDG_RUNTIME_DIR=/run/user/\$(id -u) podman start ${pod}" 2>/dev/null
ARRETES=$((ARRETES + 1))
fi
done <<< "$AVANT"
if [ $ARRETES -eq 0 ]; then
echo "FL_ALL_OK"
else
echo "FL_RESTARTED=$ARRETES"
fi
sudo su - $APP_USER -c "boo_manage -a -c status" 2>/dev/null || true
echo "FL_POST_OK"
FLEOF
bash /tmp/secops_fl_post_patch.sh
"""
PRESET_PACKAGES = {
"Patch global (avec excludes)": "",
"Java": "java-1.8.0-openjdk java-1.8.0-openjdk-headless",
"OpenSSL": "openssl",
"OpenSSH": "openssh openssh-server openssh-clients",
"gnutls + libsoup + rhc": "gnutls libsoup rhc",
"glibc": "glibc",
"mariadb-libs": "mariadb-libs",
"curl": "curl libcurl",
"Personnalise...": "CUSTOM",
}
SETTINGS_FILE = os.path.join(os.path.expanduser("~"), ".patch_manager_settings.json")
DB_PATH = os.path.join(os.path.expanduser("~"), ".patch_manager.db")
# ==============================================================================
# DATABASE — Auth + Audit
# ==============================================================================
_db_lock = threading.Lock()
def _hash_password(password, salt=None):
if salt is None:
salt = secrets.token_hex(16)
h = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 310_000)
return f"{salt}${h.hex()}"
def _verify_password(password, stored):
if "$" not in stored:
return False
salt, _ = stored.split("$", 1)
return _hash_password(password, salt) == stored
class Database:
def __init__(self):
self.conn = sqlite3.connect(DB_PATH, check_same_thread=False)
self.conn.row_factory = sqlite3.Row
self._init_tables()
def _exec(self, sql, params=()):
with _db_lock:
return self.conn.execute(sql, params)
def _commit(self):
with _db_lock:
self.conn.commit()
def _init_tables(self):
with _db_lock:
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'operator',
must_change_pwd INTEGER NOT NULL DEFAULT 1,
failed_attempts INTEGER NOT NULL DEFAULT 0,
locked INTEGER NOT NULL DEFAULT 0,
created_at TEXT DEFAULT (datetime('now','localtime')),
last_login TEXT
);
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT DEFAULT (datetime('now','localtime')),
username TEXT,
action TEXT NOT NULL,
details TEXT
);
""")
# Compte admin par defaut
row = self.conn.execute("SELECT id FROM users WHERE username='admin'").fetchone()
if not row:
h = _hash_password("pzBL4l9p24t*$zbYnxHs1mBvp")
self.conn.execute(
"INSERT INTO users (username, password_hash, role, must_change_pwd) VALUES (?,?,?,?)",
("admin", h, "admin", 1))
self.conn.commit()
def authenticate(self, username, password):
row = self._exec("SELECT * FROM users WHERE LOWER(username)=LOWER(?)", (username,)).fetchone()
if not row:
return None, "Utilisateur inconnu"
if row["locked"]:
return None, "Compte verrouille (3 tentatives). Contactez l'admin."
if not _verify_password(password, row["password_hash"]):
attempts = row["failed_attempts"] + 1
if attempts >= 3:
self._exec("UPDATE users SET failed_attempts=?, locked=1 WHERE id=?", (attempts, row["id"]))
else:
self._exec("UPDATE users SET failed_attempts=? WHERE id=?", (attempts, row["id"]))
self._commit()
remaining = 3 - attempts
if remaining <= 0:
return None, "Compte verrouille apres 3 echecs."
return None, f"Mot de passe incorrect ({remaining} essai(s) restant(s))"
# Succes
self._exec("UPDATE users SET failed_attempts=0, last_login=datetime('now','localtime') WHERE id=?",
(row["id"],))
self._commit()
return dict(row), None
def change_password(self, username, new_password):
h = _hash_password(new_password)
self._exec("UPDATE users SET password_hash=?, must_change_pwd=0 WHERE username=?", (h, username))
self._commit()
def create_user(self, username, password, role="operator"):
try:
h = _hash_password(password)
self._exec("INSERT INTO users (username, password_hash, role, must_change_pwd) VALUES (?,?,?,1)",
(username, h, role))
self._commit()
return True
except sqlite3.IntegrityError:
return False
def delete_user(self, username):
self._exec("DELETE FROM users WHERE username=? AND username != 'admin'", (username,))
self._commit()
def unlock_user(self, username):
self._exec("UPDATE users SET locked=0, failed_attempts=0 WHERE username=?", (username,))
self._commit()
def reset_password(self, username, new_password):
h = _hash_password(new_password)
self._exec("UPDATE users SET password_hash=?, must_change_pwd=1, locked=0, failed_attempts=0 WHERE username=?",
(h, username))
self._commit()
def list_users(self):
return [dict(r) for r in self._exec("SELECT * FROM users ORDER BY username").fetchall()]
def log_action(self, username, action, details=""):
self._exec("INSERT INTO audit_log (username, action, details) VALUES (?,?,?)",
(username, action, details))
self._commit()
def get_logs(self, limit=500):
return [dict(r) for r in self._exec(
"SELECT * FROM audit_log ORDER BY id DESC LIMIT ?", (limit,)).fetchall()]
# ==============================================================================
# SPLUNK HEC — envoi de logs vers Splunk
# ==============================================================================
def send_to_splunk(settings, event_data):
"""Envoie un evenement vers Splunk via HEC (HTTP Event Collector).
settings doit contenir: splunk_url, splunk_token, splunk_index, splunk_enabled
event_data: dict avec les champs a envoyer"""
if not settings.get("splunk_enabled") or settings.get("splunk_enabled") == "false":
return
url = settings.get("splunk_url", "").strip()
token = settings.get("_splunk_token_session", settings.get("splunk_token", "")).strip()
if not url or not token:
return
try:
import urllib.request
import ssl
payload = json.dumps({
"index": settings.get("splunk_index", "main"),
"sourcetype": "patch_manager",
"source": "sanef_patch_manager_v2",
"host": socket.gethostname(),
"event": event_data,
}).encode("utf-8")
req = urllib.request.Request(
url.rstrip("/") + "/services/collector/event",
data=payload,
headers={
"Authorization": f"Splunk {token}",
"Content-Type": "application/json",
},
method="POST")
# Proxy SANEF si configure
proxy_url = settings.get("proxy_url", "")
if proxy_url:
handler = urllib.request.ProxyHandler({"https": proxy_url, "http": proxy_url})
opener = urllib.request.build_opener(handler)
else:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_peer = False
handler = urllib.request.HTTPSHandler(context=ctx)
opener = urllib.request.build_opener(handler)
opener.open(req, timeout=5)
except Exception:
pass # Silencieux — ne jamais bloquer le patching pour un log
# ==============================================================================
# WINDOW CENTERING — toujours sur le meme ecran
# ==============================================================================
def center_window(win, width=None, height=None, parent=None):
"""Centre une fenetre sur le meme ecran que le parent (ou le curseur si pas de parent)"""
win.update_idletasks()
if width is None:
width = win.winfo_width()
if height is None:
height = win.winfo_height()
try:
user32 = ctypes.windll.user32
class POINT(ctypes.Structure):
_fields_ = [("x", ctypes.c_long), ("y", ctypes.c_long)]
class RECT(ctypes.Structure):
_fields_ = [("left", ctypes.c_long), ("top", ctypes.c_long),
("right", ctypes.c_long), ("bottom", ctypes.c_long)]
class MONITORINFO(ctypes.Structure):
_fields_ = [("cbSize", ctypes.c_ulong), ("rcMonitor", RECT),
("rcWork", RECT), ("dwFlags", ctypes.c_ulong)]
MONITOR_DEFAULTTONEAREST = 2
if parent is not None:
# Centrer sur l'ecran du parent
px = parent.winfo_x() + parent.winfo_width() // 2
py = parent.winfo_y() + parent.winfo_height() // 2
pt = POINT(px, py)
else:
# Centrer sur l'ecran du curseur
pt = POINT()
user32.GetCursorPos(ctypes.byref(pt))
hmon = user32.MonitorFromPoint(pt, MONITOR_DEFAULTTONEAREST)
mi = MONITORINFO()
mi.cbSize = ctypes.sizeof(MONITORINFO)
user32.GetMonitorInfoW(hmon, ctypes.byref(mi))
work = mi.rcWork
sx = work.left + (work.right - work.left - width) // 2
sy = work.top + (work.bottom - work.top - height) // 2
except Exception:
if parent is not None:
sx = parent.winfo_x() + (parent.winfo_width() - width) // 2
sy = parent.winfo_y() + (parent.winfo_height() - height) // 2
else:
sx = (win.winfo_screenwidth() - width) // 2
sy = (win.winfo_screenheight() - height) // 2
win.geometry(f"{width}x{height}+{sx}+{sy}")
# ==============================================================================
# SETTINGS
# ==============================================================================
DEFAULT_SETTINGS = {
"keyfile": r"C:\scripts\id_rsa_cybglobal.pem",
"keyfile2": r"C:\scripts\id_rsa_cybsecope.ppk",
"cybr_user": "CYBP01336",
"target_user": "cybsecope",
"psmp": "psmp.sanef.fr",
"timeout": 20,
"parallelism": 3,
"excel_file": "",
"patcher": "",
"vs_user": "",
"splunk_enabled": "false",
"splunk_url": "",
"splunk_index": "main",
"proxy_url": "http://proxy.sanef.fr:8080",
}
def load_settings():
if os.path.exists(SETTINGS_FILE):
try:
with open(SETTINGS_FILE, "r") as f:
s = json.load(f)
DEFAULT_SETTINGS.update(s)
except Exception:
pass
return dict(DEFAULT_SETTINGS)
def save_settings(settings):
try:
with open(SETTINGS_FILE, "w") as f:
json.dump(settings, f, indent=2)
except Exception:
pass
# ==============================================================================
# EXCEL READER
# ==============================================================================
def get_week_sheet(wb):
"""Trouve la feuille dont le nom correspond à la semaine courante"""
week_num = date.today().isocalendar()[1]
candidates = [f"S{week_num}", f"S{week_num:02d}",
f"Semaine {week_num}", f"W{week_num}", str(week_num)]
for name in candidates:
if name in wb.sheetnames:
return name
# Retourner toutes les feuilles pour que l'utilisateur choisisse
return None
# Couleur vert Excel = patché OK
EXCEL_GREEN = "FF00B050"
def read_excel_servers(filepath, sheet_name=None):
"""Lit le fichier Excel et retourne la liste des serveurs Linux + couleurs.
Utilise une copie temporaire pour eviter les conflits OneDrive."""
import shutil, tempfile, io
tmp_dir = tempfile.mkdtemp()
tmp_path = os.path.join(tmp_dir, os.path.basename(filepath))
wb = None
# Methode 1: copie fichier
try:
shutil.copy2(filepath, tmp_path)
wb = openpyxl.load_workbook(tmp_path, data_only=True)
except Exception:
pass
# Methode 2: lecture binaire en memoire (bypass lock OneDrive)
if wb is None:
try:
with open(filepath, "rb") as f:
data = io.BytesIO(f.read())
wb = openpyxl.load_workbook(data, data_only=True)
except Exception:
pass
# Methode 3: ouverture directe
if wb is None:
wb = openpyxl.load_workbook(filepath, data_only=True)
if sheet_name is None:
sheet_name = get_week_sheet(wb)
if sheet_name is None:
return None, wb.sheetnames # retourner la liste pour que l'user choisisse
ws = wb[sheet_name]
# Lire les en-têtes (première ligne non vide)
headers = {}
header_row = None
for row_idx, row in enumerate(ws.iter_rows(max_row=5), start=1):
vals = [c.value for c in row if c.value is not None]
if len(vals) >= 3:
headers = {str(c.value).strip().lower(): c.column - 1
for c in row if c.value is not None}
header_row = row_idx
break
if not headers:
return [], wb.sheetnames
# Mapping champs (tolérance casse/accents)
def find_col(keys):
for k in keys:
for h, idx in headers.items():
if k.lower() in h.lower():
return idx
return None
col_server = find_col(["asset name", "nom du serveur", "serveur", "hostname", "server", "nom"])
col_os = find_col(["os", "système", "systeme"])
col_env = find_col(["environnement", "environment", "env"])
col_domain = find_col(["domaine", "domain"])
col_app = find_col(["nom complet", "application", "nom_complet"])
col_accord = find_col(["accord"])
col_date = find_col(["date du patch", "date patch", "date heure", "date_heure", "date patching", "date"])
col_patcher = find_col(["intervenant", "patcheur", "technicien"])
servers = []
for row in ws.iter_rows(min_row=header_row + 1):
vals = [c.value for c in row]
cells = list(row)
if not vals or all(v is None for v in vals):
continue
def get(col):
if col is None or col >= len(vals):
return ""
v = vals[col]
return str(v).strip() if v is not None else ""
os_val = get(col_os)
if "linux" not in os_val.lower():
continue
server_name = get(col_server)
if not server_name:
continue
# Parsing date
date_val = None
if col_date is not None and col_date < len(vals):
raw_date = vals[col_date]
if isinstance(raw_date, datetime):
date_val = raw_date.date()
elif isinstance(raw_date, date):
date_val = raw_date
elif raw_date:
try:
date_val = datetime.strptime(str(raw_date).strip()[:10], "%Y-%m-%d").date()
except Exception:
try:
date_val = datetime.strptime(str(raw_date).strip()[:10], "%d/%m/%Y").date()
except Exception:
date_val = None
# Accord — règle stricte :
# colonne absente OU cellule vide = NON (on ne patche pas sans accord explicite)
# Seul "oui" / True dans la cellule = accord confirmé
if col_accord is None or col_accord >= len(vals):
raw_accord = "non"
else:
raw_v = vals[col_accord]
if raw_v is None or str(raw_v).strip() == "":
raw_accord = "non" # vide = pas d'accord
elif isinstance(raw_v, bool):
raw_accord = "oui" if raw_v else "non"
else:
v_str = str(raw_v).strip().lower()
raw_accord = "oui" if v_str == "oui" else "non"
# Détecter si la cellule serveur a un fond vert = déjà patché
already_patched = False
if col_server is not None and col_server < len(cells):
try:
fill = cells[col_server].fill
bg = fill.fgColor.rgb if fill and fill.fgColor else ""
if bg == EXCEL_GREEN or bg.upper() == EXCEL_GREEN:
already_patched = True
except Exception:
pass
servers.append({
"server": server_name,
"os": os_val,
"env": get(col_env),
"domain": get(col_domain),
"app": get(col_app),
"accord": raw_accord if raw_accord else "non",
"date_patch": date_val,
"patcher": (get(col_patcher) or "").strip(),
"selected": False,
"snap_done": False,
"is_physical": False,
"status": "✅ DÉJÀ PATCHÉ" if already_patched else "",
"patch_status": "PATCHED" if already_patched else "",
"patch_detail": "Fond vert Excel" if already_patched else "",
"reboot_required": False,
"already_patched": already_patched,
"excel_row": cells[0].row if cells else None,
})
wb.close()
# Nettoyage copie temporaire
try:
os.remove(tmp_path)
os.rmdir(tmp_dir)
except Exception:
pass
return servers, None
# ==============================================================================
# SSH HELPERS
# ==============================================================================
def build_fqdn(server, env):
if "." in server:
return [server]
env_low = env.lower()
if "recette" in env_low or "rec" in env_low:
return [server, f"{server}.sanef-rec.fr"]
else:
return [server, f"{server}.sanef.groupe"]
def load_key(keyfile):
if not keyfile or not os.path.exists(keyfile.strip('"').strip("'")):
return None
keyfile = keyfile.strip('"').strip("'")
for cls in [paramiko.RSAKey, paramiko.Ed25519Key, paramiko.ECDSAKey]:
try:
return cls.from_private_key_file(keyfile)
except Exception:
continue
return None
def ssh_connect(server, env, settings, pkey, pkey2, cyb_password=None):
"""Connexion SSH avec fallback FQDN et clé. Retourne (client, hostname_effectif)"""
candidates = build_fqdn(server, env)
is_prod = "prod" in env.lower() or "production" in env.lower()
for hostname in candidates:
if is_prod and cyb_password:
# CyberArk keyboard-interactive
username = f"{settings['cybr_user']}@{settings['target_user']}@{hostname}"
password = cyb_password
try:
def handler(title, instructions, prompt_list):
return [password] * len(prompt_list)
transport = paramiko.Transport((settings["psmp"], 22))
transport.connect()
transport.auth_interactive(username, handler)
client = paramiko.SSHClient()
client._transport = transport
client._eff_host = hostname
return client, hostname
except Exception:
continue
else:
# SSH direct avec clé
for key in [k for k in [pkey, pkey2] if k]:
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname=hostname, port=22,
username=settings["target_user"],
pkey=key, timeout=settings["timeout"],
look_for_keys=False, allow_agent=False)
client._eff_host = hostname
return client, hostname
except Exception:
continue
return None, None
def run_cmd(client, cmd, timeout=300):
try:
_, stdout, stderr = client.exec_command(cmd, timeout=timeout)
out = stdout.read().decode("utf-8", errors="ignore").strip()
err = stderr.read().decode("utf-8", errors="ignore").strip()
return out, err
except Exception as e:
return "", str(e)
def detect_physical(client):
"""Détecte si le serveur est physique ou VM"""
out, _ = run_cmd(client, "sudo virt-what 2>/dev/null || echo UNKNOWN", 10)
if not out or out == "UNKNOWN":
out2, _ = run_cmd(client, "sudo dmidecode -s system-manufacturer 2>/dev/null || echo UNKNOWN", 10)
vmware_kw = ["vmware", "virtualbox", "kvm", "xen", "qemu", "microsoft corporation"]
if any(k in out2.lower() for k in vmware_kw):
return False # VM
phys_kw = ["hp", "dell", "lenovo", "ibm", "supermicro", "cisco"]
if any(k in out2.lower() for k in phys_kw):
return True # Physique
if out and out != "UNKNOWN":
return False # virt-what a trouvé quelque chose = VM
return False # par défaut supposer VM (plus sûr)
def build_yum_command(domain, packages, exclude_kernel, dryrun):
"""Construit la commande yum selon le domaine"""
domain_low = domain.lower()
is_flux_libre = "flux libre" in domain_low
if is_flux_libre:
excludes = YUM_EXCLUDES_FLUX_LIBRE
else:
excludes = YUM_EXCLUDES_STD
if exclude_kernel:
excludes += " --exclude=*kernel*"
if dryrun:
if packages:
cmd = f"sudo yum check-update {packages} 2>/dev/null | grep -vE '^$|^Load|Updat|kB|bps|00:|^Red|^EPEL|^Sub' | grep -E '^[a-z]' | head -20"
else:
cmd = f"sudo yum check-update {excludes} 2>/dev/null | grep -vE '^$|^Load|Updat|kB|bps|00:|^Red|^EPEL|^Sub' | grep -E '^[a-z]' | head -30"
else:
if packages:
cmd = f"sudo yum update -y {packages} 2>&1 | tail -25"
else:
cmd = f"sudo yum update -y {excludes} 2>&1 | tail -25"
return cmd
# ==============================================================================
# LOGIN DIALOG
# ==============================================================================
class LoginDialog(tk.Toplevel):
def __init__(self, parent, db):
super().__init__(parent)
self.db = db
self.result = None # {"username":..., "role":...} ou None
self.title("SANEF Patch Manager - Connexion")
self.configure(bg="#1e1e2e")
self.resizable(False, False)
self.protocol("WM_DELETE_WINDOW", self._quit)
self._build()
center_window(self, 420, 340)
self.grab_set()
self.bind("<Return>", lambda e: self._login())
self.wait_window()
def _build(self):
BG = "#1e1e2e"; BG2 = "#2a2a3e"; FG = "#cdd6f4"; ACCENT = "#89b4fa"
tk.Label(self, text="SANEF PATCH MANAGER", bg="#181825", fg=ACCENT,
font=("Consolas", 14, "bold"), pady=10).pack(fill="x")
tk.Label(self, text="Authentification requise", bg=BG, fg="#6c7086",
font=("Consolas", 10)).pack(pady=(10, 15))
f = tk.Frame(self, bg=BG)
f.pack(pady=5)
tk.Label(f, text="Utilisateur :", bg=BG, fg=FG, font=("Consolas", 11),
width=14, anchor="w").grid(row=0, column=0, padx=8, pady=6)
self.user_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 11),
insertbackground=FG, width=22)
self.user_entry.grid(row=0, column=1, padx=8, pady=6)
self.user_entry.focus_set()
tk.Label(f, text="Mot de passe :", bg=BG, fg=FG, font=("Consolas", 11),
width=14, anchor="w").grid(row=1, column=0, padx=8, pady=6)
self.pwd_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 11),
insertbackground=FG, width=22, show="*")
self.pwd_entry.grid(row=1, column=1, padx=8, pady=6)
self.msg_label = tk.Label(self, text="", bg=BG, fg="#f38ba8",
font=("Consolas", 10))
self.msg_label.pack(pady=8)
bf = tk.Frame(self, bg=BG)
bf.pack(pady=10)
tk.Button(bf, text="Connexion", bg="#40a02b", fg="white",
font=("Consolas", 11, "bold"), padx=20, pady=4,
command=self._login).pack(side="left", padx=8)
tk.Button(bf, text="Quitter", bg="#313244", fg="#cdd6f4",
font=("Consolas", 10), padx=12, pady=4,
command=self._quit).pack(side="left", padx=8)
tk.Label(self, text=f"SQATM Patch Manager v{VERSION}", bg=BG, fg="#45475a",
font=("Consolas", 9)).pack(side="bottom", pady=5)
def _login(self):
username = self.user_entry.get().strip()
password = self.pwd_entry.get()
if not username or not password:
self.msg_label.configure(text="Saisir utilisateur et mot de passe")
return
user, error = self.db.authenticate(username, password)
if error:
self.msg_label.configure(text=error)
self.pwd_entry.delete(0, "end")
return
self.db.log_action(username, "LOGIN", f"role={user['role']}")
self.result = user
self.destroy()
def _quit(self):
self.result = None
self.destroy()
# ==============================================================================
# CHANGE PASSWORD DIALOG
# ==============================================================================
class ChangePasswordDialog(tk.Toplevel):
def __init__(self, parent, db, username, forced=False):
super().__init__(parent)
self._parent = parent
self.db = db
self.username = username
self.forced = forced
self.success = False
self.title("Changement de mot de passe")
self.configure(bg="#1e1e2e")
self.resizable(False, False)
if forced:
self.protocol("WM_DELETE_WINDOW", lambda: None)
self._build()
center_window(self, 420, 300, parent=parent)
self.grab_set()
self.bind("<Return>", lambda e: self._change())
self.wait_window()
def _build(self):
BG = "#1e1e2e"; BG2 = "#2a2a3e"; FG = "#cdd6f4"; ACCENT = "#89b4fa"
msg = "Premier login : vous devez changer votre mot de passe" if self.forced else "Changement de mot de passe"
tk.Label(self, text=msg, bg="#181825", fg=ACCENT,
font=("Consolas", 11, "bold"), pady=10).pack(fill="x")
f = tk.Frame(self, bg=BG)
f.pack(pady=15)
tk.Label(f, text="Nouveau mdp :", bg=BG, fg=FG, font=("Consolas", 10),
width=16, anchor="w").grid(row=0, column=0, padx=8, pady=6)
self.new_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 10),
insertbackground=FG, width=22, show="*")
self.new_entry.grid(row=0, column=1, padx=8, pady=6)
self.new_entry.focus_set()
tk.Label(f, text="Confirmer :", bg=BG, fg=FG, font=("Consolas", 10),
width=16, anchor="w").grid(row=1, column=0, padx=8, pady=6)
self.confirm_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 10),
insertbackground=FG, width=22, show="*")
self.confirm_entry.grid(row=1, column=1, padx=8, pady=6)
self.msg_label = tk.Label(self, text="", bg=BG, fg="#f38ba8",
font=("Consolas", 10))
self.msg_label.pack(pady=5)
tk.Button(self, text="Valider", bg="#40a02b", fg="white",
font=("Consolas", 11, "bold"), padx=20, pady=4,
command=self._change).pack(pady=10)
def _change(self):
p1 = self.new_entry.get()
p2 = self.confirm_entry.get()
if len(p1) < 4:
self.msg_label.configure(text="Minimum 4 caracteres")
return
if p1 != p2:
self.msg_label.configure(text="Les mots de passe ne correspondent pas")
return
self.db.change_password(self.username, p1)
self.db.log_action(self.username, "CHANGE_PASSWORD", "")
self.success = True
self.destroy()
# ==============================================================================
# DIALOGUE SETTINGS
# ==============================================================================
class SettingsDialog(tk.Toplevel):
def __init__(self, parent, settings):
super().__init__(parent)
self._parent = parent
self.title("Parametres")
self.configure(bg="#1e1e2e")
self.resizable(True, True)
self.settings = dict(settings)
self.result = None
self._build()
# Taille adaptee a l'ecran
sh = parent.winfo_screenheight()
h = min(780, int(sh * 0.85))
center_window(self, 600, h, parent=parent)
self.grab_set()
self.wait_window()
def _build(self):
BG = "#1e1e2e"; BG2 = "#2a2a3e"; FG = "#cdd6f4"; ACCENT = "#89b4fa"
# Titre fixe en haut
tk.Label(self, text="PARAMETRES", bg="#313244", fg=ACCENT,
font=("Consolas",13,"bold"), pady=8).pack(fill="x")
# Zone scrollable
canvas = tk.Canvas(self, bg=BG, highlightthickness=0)
scrollbar = ttk.Scrollbar(self, orient="vertical", command=canvas.yview)
self.scroll_frame = tk.Frame(canvas, bg=BG)
self.scroll_frame.bind("<Configure>", lambda e: canvas.configure(scrollregion=canvas.bbox("all")))
canvas.create_window((0, 0), window=self.scroll_frame, anchor="nw")
canvas.configure(yscrollcommand=scrollbar.set)
canvas.pack(side="left", fill="both", expand=True)
scrollbar.pack(side="right", fill="y")
# Scroll molette
canvas.bind_all("<MouseWheel>", lambda e: canvas.yview_scroll(int(-1*(e.delta/120)), "units"))
sf = self.scroll_frame
def row(label, key, default=""):
f = tk.Frame(sf, bg=BG)
f.pack(fill="x", padx=20, pady=4)
tk.Label(f, text=label, bg=BG, fg=FG, font=("Consolas",10),
width=22, anchor="w").pack(side="left")
var = tk.StringVar(value=self.settings.get(key, default))
setattr(self, f"var_{key}", var)
tk.Entry(f, textvariable=var, bg=BG2, fg=FG,
font=("Consolas",10), insertbackground=FG,
width=32).pack(side="left", padx=4)
return var
tk.Label(sf, text="SSH — Recette", bg=BG, fg=ACCENT,
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
row("Cle SSH principale :", "keyfile")
row("Cle SSH secondaire :", "keyfile2")
tk.Label(sf, text="CyberArk — Production", bg=BG, fg=ACCENT,
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
row("Compte CyberArk :", "cybr_user", "CYBP01336")
row("Utilisateur cible :", "target_user", "cybsecope")
row("PSMP hostname :", "psmp", "psmp.sanef.fr")
tk.Label(sf, text="Identite patcheur", bg=BG, fg=ACCENT,
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
row("Nom du patcheur :", "patcher", "")
tk.Label(sf, text="vSphere — Snapshots", bg=BG, fg=ACCENT,
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
row("Utilisateur vCenter :", "vs_user", "")
f_pwd = tk.Frame(sf, bg=BG)
f_pwd.pack(fill="x", padx=20, pady=4)
tk.Label(f_pwd, text="Mot de passe vCenter :", bg=BG, fg=FG,
font=("Consolas",10), width=22, anchor="w").pack(side="left")
self.var_vs_pwd = tk.StringVar(value="") # Jamais pre-rempli
tk.Entry(f_pwd, textvariable=self.var_vs_pwd, show="*",
bg=BG2, fg=FG, font=("Consolas",10),
insertbackground=FG, width=32).pack(side="left", padx=4)
tk.Label(f_pwd, text="(non stocke)", bg=BG, fg="#6c7086",
font=("Consolas",8)).pack(side="left", padx=4)
tk.Label(sf, text="Connexion SSH", bg=BG, fg=ACCENT,
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
row("Timeout SSH (s) :", "timeout", "20")
row("Parallelisme :", "parallelism", "3")
tk.Label(sf, text="Splunk HEC (logs)", bg=BG, fg=ACCENT,
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
self.var_splunk_enabled = tk.BooleanVar(
value=self.settings.get("splunk_enabled", "false") == "true")
tk.Checkbutton(sf, text="Activer l'envoi vers Splunk",
variable=self.var_splunk_enabled,
bg=BG, fg=FG, selectcolor=BG2, activebackground=BG,
font=("Consolas",10)).pack(anchor="w", padx=20)
row("URL HEC :", "splunk_url", "https://splunk.sanef.fr:8088")
# Token non stocke dans le JSON — session only
f_tok = tk.Frame(sf, bg=BG)
f_tok.pack(fill="x", padx=20, pady=4)
tk.Label(f_tok, text="Token HEC :", bg=BG, fg=FG,
font=("Consolas",10), width=22, anchor="w").pack(side="left")
self.var_splunk_token = tk.StringVar(value="") # Jamais pre-rempli
tk.Entry(f_tok, textvariable=self.var_splunk_token, show="*",
bg=BG2, fg=FG, font=("Consolas",10),
insertbackground=FG, width=32).pack(side="left", padx=4)
tk.Label(f_tok, text="(non stocke)", bg=BG, fg="#6c7086",
font=("Consolas",8)).pack(side="left", padx=4)
row("Index :", "splunk_index", "main")
row("Proxy (optionnel) :", "proxy_url", "http://proxy.sanef.fr:8080")
# Boutons en bas du scroll
bf = tk.Frame(sf, bg=BG)
bf.pack(pady=16)
tk.Button(bf, text="Sauvegarder", bg="#40a02b", fg="white",
font=("Consolas",11,"bold"), padx=12, pady=6,
command=self._save).pack(side="left", padx=8)
tk.Button(bf, text="Annuler", bg="#313244", fg="#cdd6f4",
font=("Consolas",10), padx=12, pady=6,
command=self.destroy).pack(side="left", padx=8)
def _save(self):
keys = ["keyfile","keyfile2","cybr_user","target_user","psmp","timeout","parallelism",
"patcher","vs_user","splunk_url","splunk_token","splunk_index","proxy_url"]
for k in keys:
var = getattr(self, f"var_{k}", None)
if var:
val = var.get().strip()
if k in ("timeout","parallelism"):
try: val = int(val)
except: val = DEFAULT_SETTINGS.get(k, 20)
self.settings[k] = val
# Splunk enabled (stocke dans JSON — pas sensible)
if hasattr(self, "var_splunk_enabled"):
self.settings["splunk_enabled"] = "true" if self.var_splunk_enabled.get() else "false"
# vCenter pwd et Splunk token : session only, jamais dans le JSON
if hasattr(self, "var_vs_pwd"):
self.settings["_vs_pwd_session"] = self.var_vs_pwd.get()
if hasattr(self, "var_splunk_token"):
self.settings["_splunk_token_session"] = self.var_splunk_token.get()
self.result = self.settings
# Sauvegarder dans JSON SANS les secrets
to_save = {k: v for k, v in self.settings.items() if not k.startswith("_")}
save_settings(to_save)
self.destroy()
# ==============================================================================
# DIALOGUE CONFIRMATION COMMANDE YUM
# ==============================================================================
class YumConfirmDialog(tk.Toplevel):
def __init__(self, parent, server, cmd):
super().__init__(parent)
self.title("Confirmation commande patch")
self.configure(bg="#1e1e2e")
self.result = None # "apply", "apply_all", "cancel"
self.cmd_var = tk.StringVar(value=cmd)
self._build(server, cmd)
center_window(self, 680, 340, parent=parent)
self.grab_set()
self.wait_window()
def _build(self, server, cmd):
BG = "#1e1e2e"; FG = "#cdd6f4"; ACCENT = "#89b4fa"; YELLOW = "#f9e2af"
tk.Label(self, text="⚠️ CONFIRMATION COMMANDE PATCH",
bg="#313244", fg=ACCENT, font=("Consolas",12,"bold"),
pady=8).pack(fill="x")
tk.Label(self, text=f"Serveur : {server}",
bg=BG, fg=YELLOW, font=("Consolas",11)).pack(anchor="w", padx=16, pady=6)
tk.Label(self, text="Commande à exécuter :",
bg=BG, fg=FG, font=("Consolas",10)).pack(anchor="w", padx=16)
cmd_entry = tk.Text(self, height=5, bg="#11111b", fg="#a6e3a1",
font=("Consolas",10), wrap="word",
insertbackground=FG)
cmd_entry.insert("1.0", cmd)
cmd_entry.pack(fill="x", padx=16, pady=4)
self._cmd_text = cmd_entry
bf = tk.Frame(self, bg=BG)
bf.pack(pady=12)
tk.Button(bf, text="▶ Appliquer",
bg="#40a02b", fg="white", font=("Consolas",11,"bold"),
padx=10, pady=6,
command=lambda: self._set("apply")).pack(side="left", padx=6)
tk.Button(bf, text="▶▶ Appliquer à TOUS",
bg="#1e66f5", fg="white", font=("Consolas",11,"bold"),
padx=10, pady=6,
command=lambda: self._set("apply_all")).pack(side="left", padx=6)
tk.Button(bf, text="✗ Annuler ce serveur",
bg="#d20f39", fg="white", font=("Consolas",10),
padx=10, pady=6,
command=lambda: self._set("cancel")).pack(side="left", padx=6)
def _set(self, val):
self.result = val
self.final_cmd = self._cmd_text.get("1.0", "end").strip()
self.destroy()
# ==============================================================================
# APPLICATION PRINCIPALE
# ==============================================================================
class PatchManagerV2:
def __init__(self, root, db, current_user):
self.root = root
self.db = db
self.current_user = current_user # {"username":..., "role":...}
self.root.title(f"SANEF Linux Patch Manager v{VERSION}{current_user['username']} ({current_user['role']})")
self.root.configure(bg="#1e1e2e")
# Plein ecran sur le moniteur ou se trouve la fenetre
try:
center_window(root, root.winfo_screenwidth(), root.winfo_screenheight(), parent=root)
root.after(100, lambda: root.state("zoomed"))
except Exception:
root.state("zoomed")
self.settings = load_settings()
self.servers = []
self.results = []
self.running = False
self.pkey = None
self.pkey2 = None
self.cyb_pwd = None
self.apply_all_cmd = False
self.is_current_week = True # True = semaine courante (patch autorise)
self._build_ui()
self._reload_keys()
self.audit("APP_START", "")
# ==========================================================================
# COULEURS
# ==========================================================================
BG = "#1e1e2e"
BG2 = "#2a2a3e"
FG = "#cdd6f4"
ACCENT = "#89b4fa"
GREEN = "#a6e3a1"
RED = "#f38ba8"
YELLOW = "#f9e2af"
ORANGE = "#fab387"
BTN = "#313244"
# ==========================================================================
# AUDIT HELPER
# ==========================================================================
def audit(self, action, details=""):
username = self.current_user["username"]
self.db.log_action(username, action, details)
# Envoi Splunk en arriere-plan (non bloquant)
threading.Thread(target=send_to_splunk, args=(self.settings, {
"action": action, "user": username, "details": details,
"timestamp": datetime.now().isoformat(),
}), daemon=True).start()
# ==========================================================================
# BUILD UI
# ==========================================================================
def _build_ui(self):
BG=self.BG; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
# Style ttk
style = ttk.Style()
style.theme_use("clam")
style.configure("TFrame", background=self.BG)
style.configure("TLabel", background=self.BG, foreground=self.FG, font=("Consolas",10))
style.configure("Treeview", background=self.BG2, foreground=self.FG,
fieldbackground=self.BG2, font=("Consolas",9), rowheight=22)
style.configure("Treeview.Heading", background=self.BTN, foreground=self.ACCENT,
font=("Consolas",10,"bold"))
style.map("Treeview", background=[("selected","#45475a")])
style.configure("TCombobox", fieldbackground=self.BG2, foreground=self.FG,
background=self.BG2, font=("Consolas",10))
style.configure("TCheckbutton",background=self.BG, foreground=self.FG, font=("Consolas",10))
# ── Barre titre ────────────────────────────────────────────────────────
tb = tk.Frame(self.root, bg="#181825", pady=6)
tb.pack(fill="x")
tk.Label(tb, text=f"⚡ SANEF LINUX PATCH MANAGER v{VERSION}",
bg="#181825", fg=ACCENT, font=("Consolas",14,"bold")).pack(side="left", padx=12)
tk.Label(tb, text=f"Semaine {date.today().isocalendar()[1]} | {date.today():%d/%m/%Y}",
bg="#181825", fg="#6c7086", font=("Consolas",10)).pack(side="left", padx=12)
tk.Button(tb, text="A propos", bg=BTN, fg="#6c7086",
font=("Consolas",9), pady=2,
command=self._show_about).pack(side="right", padx=4)
tk.Button(tb, text="Mon MDP", bg=BTN, fg="#fab387",
font=("Consolas",9), pady=2,
command=self._change_my_password).pack(side="right", padx=4)
tk.Button(tb, text="Settings", bg=BTN, fg=FG,
font=("Consolas",10), pady=2,
command=self._open_settings).pack(side="right", padx=12)
# ── Notebook (onglets) ─────────────────────────────────────────────────
nb_style = ttk.Style()
nb_style.configure("TNotebook", background=self.BG, borderwidth=0)
nb_style.configure("TNotebook.Tab", background=self.BTN, foreground=self.FG,
font=("Consolas",10,"bold"), padding=[12,4])
nb_style.map("TNotebook.Tab",
background=[("selected","#313244")],
foreground=[("selected",self.ACCENT)])
self.nb = ttk.Notebook(self.root)
self.nb.pack(fill="both", expand=True, padx=6, pady=4)
# Onglets
self.tab_servers = tk.Frame(self.nb, bg=BG)
self.tab_patch = tk.Frame(self.nb, bg=BG)
self.tab_post = tk.Frame(self.nb, bg=BG)
self.tab_report = tk.Frame(self.nb, bg=BG)
self.tab_audit = tk.Frame(self.nb, bg=BG)
self.nb.add(self.tab_servers, text="1. Selection serveurs")
self.nb.add(self.tab_patch, text="2. Patch")
self.nb.add(self.tab_post, text="3. Post-patching")
self.nb.add(self.tab_report, text="4. Rapport")
if self.current_user["role"] == "admin":
self.nb.add(self.tab_audit, text="5. Audit")
self.tab_users = tk.Frame(self.nb, bg=BG)
self.nb.add(self.tab_users, text="6. Utilisateurs")
self._build_tab_servers()
self._build_tab_patch()
self._build_tab_post()
self._build_tab_report()
if self.current_user["role"] == "admin":
self._build_tab_audit()
self._build_tab_users()
# Viewer = onglets patch/post desactives
if self.current_user["role"] == "viewer":
self.nb.tab(self.tab_patch, state="disabled")
self.nb.tab(self.tab_post, state="disabled")
# ==========================================================================
# ONGLET 1 — SÉLECTION SERVEURS
# ==========================================================================
def _build_tab_servers(self):
tab = self.tab_servers
BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
# ── Ligne 1 : Excel ───────────────────────────────────────────────────
ef = tk.Frame(tab, bg=BG)
ef.pack(fill="x", padx=10, pady=6)
tk.Label(ef, text="📊 Fichier Excel :", bg=BG, fg=FG,
font=("Consolas",10)).pack(side="left")
self.excel_var = tk.StringVar(value=self.settings.get("excel_file",""))
tk.Entry(ef, textvariable=self.excel_var, bg=BG2, fg=FG,
font=("Consolas",10), insertbackground=FG,
width=55).pack(side="left", padx=6)
tk.Button(ef, text="Parcourir", bg=BTN, fg=FG, font=("Consolas",9),
command=self._browse_excel).pack(side="left", padx=2)
# Feuille
tk.Label(ef, text="Feuille :", bg=BG, fg=FG,
font=("Consolas",10)).pack(side="left", padx=(12,2))
self.sheet_var = tk.StringVar()
self.sheet_cb = ttk.Combobox(ef, textvariable=self.sheet_var,
state="readonly", width=10)
self.sheet_cb.pack(side="left", padx=2)
self.sheet_cb.bind("<<ComboboxSelected>>", lambda e: self._load_sheet())
# Navigation rapide semaines
tk.Button(ef, text="", bg=BTN, fg=FG, font=("Consolas",10),
width=2, command=self._prev_week).pack(side="left", padx=1)
tk.Button(ef, text="", bg=BTN, fg=FG, font=("Consolas",10),
width=2, command=self._next_week).pack(side="left", padx=1)
tk.Label(ef, text="S. courante", bg=BG, fg="#6c7086",
font=("Consolas",9)).pack(side="left", padx=2)
tk.Button(ef, text="", bg=BTN, fg=self.ACCENT, font=("Consolas",10),
width=2, command=self._goto_current_week).pack(side="left", padx=1)
tk.Button(ef, text="Charger", bg="#1e66f5", fg="white",
font=("Consolas",10,"bold"), pady=3,
command=self._load_excel).pack(side="left", padx=8)
# Indicateur semaine courante/visu
self.week_mode_lbl = tk.Label(ef, text="", bg=self.BG,
font=("Consolas",10,"bold"))
self.week_mode_lbl.pack(side="left", padx=8)
# ── Ligne 2 : Filtres ─────────────────────────────────────────────────
ff = tk.Frame(tab, bg=BG)
ff.pack(fill="x", padx=10, pady=4)
tk.Label(ff, text="Filtres :", bg=BG, fg=ACCENT,
font=("Consolas",10,"bold")).pack(side="left")
# Date du jour
self.filter_today_var = tk.BooleanVar(value=False)
tk.Checkbutton(ff, text="📅 Aujourd'hui uniquement",
variable=self.filter_today_var,
bg=BG, fg=self.YELLOW, selectcolor=BG2,
activebackground=BG,
font=("Consolas",10)).pack(side="left", padx=8)
# Filtre Intervenant — visible uniquement pour admin
self.filter_patcher_var = tk.StringVar(value="Tous")
if self.current_user["role"] == "admin":
tk.Label(ff, text="Intervenant :", bg=BG, fg=self.YELLOW,
font=("Consolas",10,"bold")).pack(side="left", padx=(8,2))
self.filter_patcher_cb = ttk.Combobox(ff, textvariable=self.filter_patcher_var,
state="readonly", width=14)
self.filter_patcher_cb.pack(side="left", padx=2)
self.filter_patcher_cb.bind("<<ComboboxSelected>>", lambda e: self._apply_filters())
else:
self.filter_patcher_cb = None # Non-admin : pas de filtre intervenant
for label, attr in [("Env", "filter_env"), ("Domaine", "filter_domain"),
("Application", "filter_app")]:
tk.Label(ff, text=f"{label}:", bg=BG, fg=FG,
font=("Consolas",10)).pack(side="left", padx=(8,2))
var = tk.StringVar(value="Tous")
setattr(self, f"{attr}_var", var)
cb = ttk.Combobox(ff, textvariable=var, state="readonly", width=14)
setattr(self, f"{attr}_cb", cb)
cb.pack(side="left", padx=2)
tk.Button(ff, text="Rechercher", bg=BTN, fg=ACCENT,
font=("Consolas",10,"bold"), pady=3,
command=self._apply_filters).pack(side="left", padx=6)
# ── Ligne 3 : Actions sélection ───────────────────────────────────────
af = tk.Frame(tab, bg=BG)
af.pack(fill="x", padx=10, pady=2)
for txt, cmd in [("✓ Tout (accordé)", self._select_all_ok),
("✗ Aucun", self._select_none),
("↕ Inverser", self._invert)]:
tk.Button(af, text=txt, bg=BTN, fg=FG, font=("Consolas",9),
command=cmd).pack(side="left", padx=3)
self.count_lbl = tk.Label(af, text="", bg=BG, fg=self.YELLOW,
font=("Consolas",9))
self.count_lbl.pack(side="right", padx=10)
# ── Treeview serveurs ─────────────────────────────────────────────────
tree_frame = tk.Frame(tab, bg=BG)
tree_frame.pack(fill="both", expand=True, padx=10, pady=4)
cols = ("sel","accord","serveur","intervenant","env","domaine","app","date_patch","snap","statut")
self.srv_tree = ttk.Treeview(tree_frame, columns=cols,
show="headings", height=22)
hdrs = {"sel":"","accord":"Accord","serveur":"Serveur",
"intervenant":"Intervenant","env":"Env",
"domaine":"Domaine","app":"Application","date_patch":"Date patch",
"snap":"Snapshot","statut":"Statut"}
widths = {"sel":30,"accord":70,"serveur":180,"intervenant":90,"env":80,"domaine":120,
"app":150,"date_patch":90,"snap":80,"statut":100}
for c in cols:
self.srv_tree.heading(c, text=hdrs[c])
self.srv_tree.column(c, width=widths[c], anchor="center" if c in ("sel","accord","snap","statut") else "w")
self.srv_tree.tag_configure("no_accord", foreground="#585b70", background="#1e1e2e")
self.srv_tree.tag_configure("ok", foreground=self.GREEN)
self.srv_tree.tag_configure("warn", foreground=self.YELLOW)
self.srv_tree.tag_configure("ko", foreground=self.RED)
self.srv_tree.tag_configure("selected", foreground=self.FG, background="#313244")
self.srv_tree.tag_configure("normal", foreground=self.FG)
self.srv_tree.tag_configure("already_patched",foreground="#1e1e2e", background="#40a02b")
vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.srv_tree.yview)
hsb = ttk.Scrollbar(tree_frame, orient="horizontal", command=self.srv_tree.xview)
self.srv_tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
self.srv_tree.pack(side="left", fill="both", expand=True)
vsb.pack(side="right", fill="y")
hsb.pack(side="bottom", fill="x")
self.srv_tree.bind("<ButtonRelease-1>", self._toggle_srv)
# ── Boutons prerequis + snapshot ──────────────────────────────────────
prereq_frame = tk.Frame(tab, bg=BG)
prereq_frame.pack(fill="x", padx=10, pady=4)
self.prereq_btn = tk.Button(prereq_frame, text="Verifier prerequis",
bg="#e64553", fg="white", font=("Consolas",12,"bold"),
pady=6, command=self._check_prerequisites)
self.prereq_btn.pack(side="left", padx=4)
self.prereq_status = tk.Label(prereq_frame, text="", bg=BG, fg=self.FG,
font=("Consolas",10))
self.prereq_status.pack(side="left", padx=12)
self.snap_btn = tk.Button(prereq_frame, text="Prendre snapshot vSphere",
bg="#313244", fg="#6c7086", font=("Consolas",10),
pady=4, state="disabled", command=self._take_snapshots)
self.snap_btn.pack(side="left", padx=4)
tk.Button(prereq_frame,
text="▶ Aller au Patch →",
bg="#40a02b", fg="white", font=("Consolas",11,"bold"),
pady=5,
command=lambda: self.nb.select(self.tab_patch)).pack(side="right", padx=4)
# ==========================================================================
# ONGLET 2 — PATCH
# ==========================================================================
def _build_tab_patch(self):
tab = self.tab_patch
BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
top = tk.Frame(tab, bg=BG)
top.pack(fill="both", expand=True, padx=10, pady=8)
# Colonne gauche : options (scrollable)
left_canvas = tk.Canvas(top, bg=BG, highlightthickness=0, width=380)
left_sb = ttk.Scrollbar(top, orient="vertical", command=left_canvas.yview)
left = tk.Frame(left_canvas, bg=BG)
left.bind("<Configure>", lambda e: left_canvas.configure(scrollregion=left_canvas.bbox("all")))
left_canvas.create_window((0, 0), window=left, anchor="nw")
left_canvas.configure(yscrollcommand=left_sb.set)
left_canvas.pack(side="left", fill="y")
left_sb.pack(side="left", fill="y")
left_canvas.bind_all("<MouseWheel>", lambda e: left_canvas.yview_scroll(int(-1*(e.delta/120)), "units"))
tk.Label(left, text="OPTIONS PATCH", bg=BTN, fg=ACCENT,
font=("Consolas",11,"bold"), padx=8, pady=4).pack(fill="x")
# Preset packages
tk.Label(left, text="Preset packages A PATCHER :", bg=BG, fg=FG,
font=("Consolas",10)).pack(anchor="w", pady=(8,2))
self.preset_var = tk.StringVar(value=list(PRESET_PACKAGES.keys())[0])
preset_cb = ttk.Combobox(left, textvariable=self.preset_var,
values=list(PRESET_PACKAGES.keys()),
state="readonly", width=30)
preset_cb.pack(anchor="w")
preset_cb.bind("<<ComboboxSelected>>", self._on_preset)
# Champ packages a patcher
tk.Label(left, text="Packages specifiques a PATCHER :", bg=BG, fg=FG,
font=("Consolas",10)).pack(anchor="w", pady=(8,2))
self.packages_var = tk.StringVar(value="")
self.packages_entry = tk.Entry(left, textvariable=self.packages_var, bg=BG2, fg=FG,
font=("Consolas",10), insertbackground=FG, width=32)
self.packages_entry.pack(anchor="w")
tk.Label(left, text="Ex: gnutls libsoup rhc",
bg=BG, fg="#6c7086", font=("Consolas",9)).pack(anchor="w")
# Champ packages a exclure
tk.Label(left, text="Packages specifiques a EXCLURE :", bg=BG, fg=FG,
font=("Consolas",10)).pack(anchor="w", pady=(8,2))
self.custom_excludes_var = tk.StringVar(value="")
self.custom_excludes_entry = tk.Entry(left, textvariable=self.custom_excludes_var, bg=BG2, fg=FG,
font=("Consolas",10), insertbackground=FG, width=32)
self.custom_excludes_entry.pack(anchor="w")
tk.Label(left, text="Ex: kernel* podman* docker*",
bg=BG, fg="#6c7086", font=("Consolas",9)).pack(anchor="w")
# Commande finale editable
tk.Label(left, text="Commande finale (editable) :", bg=BG, fg=self.YELLOW,
font=("Consolas",10,"bold")).pack(anchor="w", pady=(10,2))
self.final_cmd_var = tk.StringVar(value="")
self.final_cmd_entry = tk.Entry(left, textvariable=self.final_cmd_var,
bg="#181825", fg=self.YELLOW, font=("Consolas",9),
insertbackground=self.YELLOW, width=48)
self.final_cmd_entry.pack(anchor="w")
tk.Button(left, text="Generer la commande", bg=BTN, fg=FG,
font=("Consolas",9), command=self._preview_cmd).pack(anchor="w", pady=(4,0))
# Aide
help_frame = tk.Frame(left, bg="#181825", pady=4, padx=8)
help_frame.pack(fill="x", pady=(6,0))
tk.Label(help_frame, text="Preset = commande predeterminee",
bg="#181825", fg="#a6e3a1", font=("Consolas",8)).pack(anchor="w")
tk.Label(help_frame, text="Personnalise = packages a patcher ET/OU a exclure",
bg="#181825", fg="#f9e2af", font=("Consolas",8)).pack(anchor="w")
tk.Label(help_frame, text="La commande finale est toujours editable avant GO",
bg="#181825", fg="#6c7086", font=("Consolas",8)).pack(anchor="w")
# Options
self.exclude_kernel_var = tk.BooleanVar(value=True)
self.dryrun_var = tk.BooleanVar(value=True)
tk.Checkbutton(left, text="☑ Exclure kernel (recommandé)",
variable=self.exclude_kernel_var,
bg=BG, fg=self.YELLOW, selectcolor=BG2,
activebackground=BG,
font=("Consolas",10,"bold")).pack(anchor="w", pady=(10,2))
tk.Checkbutton(left, text="🔍 Dry Run (simulation)",
variable=self.dryrun_var,
bg=BG, fg=self.ORANGE, selectcolor=BG2,
activebackground=BG,
font=("Consolas",10,"bold")).pack(anchor="w", pady=2)
# Mot de passe CyberArk
tk.Label(left, text="Mot de passe CyberArk (prod) :",
bg=BG, fg=FG, font=("Consolas",10)).pack(anchor="w", pady=(12,2))
self.cybpwd_var = tk.StringVar()
tk.Entry(left, textvariable=self.cybpwd_var, show="",
bg=BG2, fg=FG, font=("Consolas",10),
insertbackground=FG, width=28).pack(anchor="w")
# GO button
self.go_btn = tk.Button(left, text="🚀 GO PATCH",
bg="#d20f39", fg="white",
font=("Consolas",16,"bold"),
pady=12, padx=20,
command=self._go_patch)
self.go_btn.pack(fill="x", pady=16)
self.stop_btn = tk.Button(left, text="⏹ STOP",
bg="#6c7086", fg="white",
font=("Consolas",11,"bold"),
pady=6, state="disabled",
command=self._stop)
self.stop_btn.pack(fill="x")
# Colonne droite : log temps réel
right = tk.Frame(top, bg=BG)
right.pack(side="left", fill="both", expand=True)
tk.Label(right, text="DÉROULÉ EN TEMPS RÉEL",
bg=BTN, fg=ACCENT,
font=("Consolas",11,"bold"), pady=4).pack(fill="x")
self.patch_log = scrolledtext.ScrolledText(
right, height=28, bg="#11111b", fg=FG,
font=("Consolas",9), insertbackground=FG, state="disabled")
self.patch_log.pack(fill="both", expand=True, pady=4)
for tag, color in [("ok",self.GREEN),("ko",self.RED),
("warn",self.YELLOW),("info",ACCENT),
("cmd","#cba6f7")]:
self.patch_log.tag_configure(tag, foreground=color)
# Barre progression
pf = tk.Frame(tab, bg=BG)
pf.pack(fill="x", padx=10, pady=4)
self.prog_lbl = tk.Label(pf, text="En attente...",
bg=BG, fg=FG, font=("Consolas",9))
self.prog_lbl.pack(side="left", padx=4)
self.prog_var = tk.DoubleVar()
ttk.Progressbar(pf, variable=self.prog_var,
maximum=100, length=600).pack(side="left", fill="x", expand=True, padx=4)
# ==========================================================================
# ONGLET 3 — POST-PATCHING
# ==========================================================================
def _build_tab_post(self):
tab = self.tab_post
BG=self.BG; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
tk.Label(tab, text="POST-PATCHING", bg=BTN, fg=ACCENT,
font=("Consolas",12,"bold"), pady=6).pack(fill="x", padx=10, pady=8)
bf = tk.Frame(tab, bg=BG)
bf.pack(fill="x", padx=10, pady=4)
tk.Button(bf, text="🔍 Check post-patching",
bg="#1e66f5", fg="white", font=("Consolas",11,"bold"),
pady=6, command=self._post_patch_check).pack(side="left", padx=4)
tk.Button(bf, text="🗑 Supprimer anciens kernels",
bg="#e64553", fg="white", font=("Consolas",11,"bold"),
pady=6, command=self._remove_old_kernels).pack(side="left", padx=4)
tk.Button(bf, text="↩ Undo yum",
bg="#fe640b", fg="white", font=("Consolas",11,"bold"),
pady=6, command=self._undo_yum).pack(side="left", padx=4)
tk.Button(bf, text="🔄 Reboot serveurs sélectionnés",
bg="#8839ef", fg="white", font=("Consolas",11,"bold"),
pady=6, command=self._reboot_servers).pack(side="left", padx=4)
tk.Button(bf, text="🟩 Marquer patchés dans Excel",
bg="#40a02b", fg="white", font=("Consolas",11,"bold"),
pady=6, command=self._ask_mark_patched).pack(side="left", padx=4)
# Deuxième rangée de boutons
bf2 = tk.Frame(tab, bg=BG)
bf2.pack(fill="x", padx=10, pady=2)
tk.Button(bf2, text="🗑 Supprimer snapshots > 3 jours",
bg="#e64553", fg="white", font=("Consolas",11,"bold"),
pady=6, command=self._delete_old_snapshots).pack(side="left", padx=4)
self.post_log = scrolledtext.ScrolledText(
tab, bg="#11111b", fg=FG,
font=("Consolas",9), insertbackground=FG, state="disabled")
self.post_log.pack(fill="both", expand=True, padx=10, pady=4)
for tag, color in [("ok",self.GREEN),("ko",self.RED),
("warn",self.YELLOW),("info",ACCENT)]:
self.post_log.tag_configure(tag, foreground=color)
# ==========================================================================
# ONGLET 4 — RAPPORT PATCHING (style dashboard)
# ==========================================================================
def _build_tab_report(self):
tab = self.tab_report
BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
# ── Barre titre + boutons ─────────────────────────────────────────────
hf = tk.Frame(tab, bg=BTN, pady=6)
hf.pack(fill="x", padx=0, pady=0)
tk.Label(hf, text="RAPPORT DE PATCHING",
bg=BTN, fg=ACCENT, font=("Consolas",12,"bold")).pack(side="left", padx=12)
bf = tk.Frame(hf, bg=BTN)
bf.pack(side="right", padx=8)
tk.Button(bf, text="Generer", bg="#40a02b", fg="white",
font=("Consolas",10,"bold"), pady=3,
command=self._generate_report).pack(side="left", padx=4)
tk.Button(bf, text="CSV", bg=BTN, fg=FG,
font=("Consolas",10), pady=3,
command=self._export_csv).pack(side="left", padx=4)
tk.Button(bf, text="DokuWiki", bg=BTN, fg=FG,
font=("Consolas",10), pady=3,
command=self._export_dokuwiki).pack(side="left", padx=4)
# ── Widgets KPI (ligne de compteurs) ──────────────────────────────────
kpi_frame = tk.Frame(tab, bg=BG)
kpi_frame.pack(fill="x", padx=10, pady=8)
# Créer 6 widgets KPI
self.kpi_widgets = {}
kpi_defs = [
("total", "Total traités", "#313244", FG),
("ok", "Connexion OK", "#1a4a2e", self.GREEN),
("patched", "Patchés", "#1a4a2e", self.GREEN),
("uptodate", "Déjà à jour", "#2a2a3e", ACCENT),
("reboot", "Reboot requis", "#4a3000", self.YELLOW),
("ko", "KO / Auth", "#4a1a1a", self.RED),
]
for key, label, bg_col, fg_col in kpi_defs:
frame = tk.Frame(kpi_frame, bg=bg_col, padx=16, pady=10,
relief="flat", bd=0)
frame.pack(side="left", fill="x", expand=True, padx=4)
# Valeur
val_lbl = tk.Label(frame, text="", bg=bg_col, fg=fg_col,
font=("Consolas",24,"bold"))
val_lbl.pack()
# Label
tk.Label(frame, text=label, bg=bg_col, fg=fg_col,
font=("Consolas",9)).pack()
self.kpi_widgets[key] = val_lbl
# ── Tableau détaillé résultats ─────────────────────────────────────────
tk.Label(tab, text="DETAIL PAR SERVEUR",
bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=12, pady=(4,2))
tree_frame = tk.Frame(tab, bg=BG)
tree_frame.pack(fill="both", expand=True, padx=10, pady=4)
rcols = ("serveur","env","domaine","accord","snap","connexion","patch","reboot","detail")
self.report_tree = ttk.Treeview(tree_frame, columns=rcols,
show="headings", height=14)
rhdrs = {
"serveur": "Serveur",
"env": "Env",
"domaine": "Domaine",
"accord": "Accord",
"snap": "Snapshot",
"connexion": "Connexion",
"patch": "Patch",
"reboot": "Reboot",
"detail": "Detail packages",
}
rwidths = {
"serveur":200,"env":80,"domaine":100,"accord":70,
"snap":80,"connexion":90,"patch":100,"reboot":70,"detail":280
}
for c in rcols:
self.report_tree.heading(c, text=rhdrs[c])
self.report_tree.column(c, width=rwidths[c],
anchor="center" if c not in ("serveur","detail") else "w")
self.report_tree.tag_configure("ok", foreground=self.GREEN)
self.report_tree.tag_configure("ko", foreground=self.RED)
self.report_tree.tag_configure("warn", foreground=self.YELLOW)
self.report_tree.tag_configure("patched", foreground=self.GREEN, background="#1a2e1a")
self.report_tree.tag_configure("reboot", foreground=self.YELLOW, background="#2e2a00")
self.report_tree.tag_configure("uptodate",foreground=ACCENT)
vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.report_tree.yview)
hsb = ttk.Scrollbar(tree_frame, orient="horizontal", command=self.report_tree.xview)
self.report_tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
self.report_tree.pack(side="left", fill="both", expand=True)
vsb.pack(side="right", fill="y")
hsb.pack(side="bottom", fill="x")
# ── Zone log texte (optionnel, compact) ───────────────────────────────
tk.Label(tab, text="LOG BRUT",
bg=BG, fg="#6c7086", font=("Consolas",9)).pack(anchor="w", padx=12, pady=(4,0))
self.report_txt = scrolledtext.ScrolledText(
tab, height=6, bg="#11111b", fg=FG,
font=("Consolas",8), insertbackground=FG, state="disabled")
self.report_txt.pack(fill="x", padx=10, pady=(0,6))
for tag, color in [("ok",self.GREEN),("ko",self.RED),
("warn",self.YELLOW),("info",ACCENT),
("title","#cba6f7")]:
self.report_txt.tag_configure(tag, foreground=color,
font=("Consolas",9,"bold") if tag=="title" else ("Consolas",8))
# ==========================================================================
# HELPERS LOG
# ==========================================================================
def _log(self, widget, msg, tag="info"):
def _do():
widget.configure(state="normal")
ts = datetime.now().strftime("%H:%M:%S")
widget.insert("end", f"[{ts}] {msg}\n", tag)
widget.see("end")
widget.configure(state="disabled")
self.root.after(0, _do)
def _log_patch(self, msg, tag="info"):
self._log(self.patch_log, msg, tag)
def _log_post(self, msg, tag="info"):
self._log(self.post_log, msg, tag)
# ==========================================================================
# PREREQUIS — SSH + Disque + Satellite + Snapshot
# ==========================================================================
def _check_prerequisites(self):
selected = [s for s in self.servers if s.get("selected") and s["accord"] == "oui"]
if not selected:
messagebox.showwarning("Prerequis", "Aucun serveur selectionne avec accord=OUI")
return
# Recharger les cles SSH
self._reload_keys()
# Verifier si des serveurs prod necessitent PSMP
has_prod = any("prod" in s.get("env", "").lower() for s in selected)
if has_prod and not self.cyb_pwd:
# Lire depuis le champ mot de passe CyberArk de l'onglet Patch
pwd = self.cybpwd_var.get().strip() if hasattr(self, 'cybpwd_var') else ""
if pwd:
self.cyb_pwd = pwd
else:
# Demander le mot de passe via dialogue centre sur le parent
dlg = tk.Toplevel(self.root)
dlg.title("CyberArk PSMP")
dlg.configure(bg="#1e1e2e")
dlg.resizable(False, False)
result_pwd = [None]
tk.Label(dlg, text=f"Mot de passe CyberArk ({self.settings.get('cybr_user', 'CYBP01336')}) :",
bg="#1e1e2e", fg="#cdd6f4", font=("Consolas", 11)).pack(padx=20, pady=(15, 5))
pwd_var = tk.StringVar()
pwd_entry = tk.Entry(dlg, textvariable=pwd_var, show="*", bg="#2a2a3e", fg="#cdd6f4",
font=("Consolas", 11), insertbackground="#cdd6f4", width=30)
pwd_entry.pack(padx=20, pady=5)
pwd_entry.focus_set()
def _ok(event=None):
result_pwd[0] = pwd_var.get().strip()
dlg.destroy()
pwd_entry.bind("<Return>", _ok)
tk.Button(dlg, text="OK", bg="#40a02b", fg="white", font=("Consolas", 10, "bold"),
padx=20, command=_ok).pack(pady=10)
center_window(dlg, 400, 140, parent=self.root)
dlg.grab_set()
dlg.wait_window()
pwd = result_pwd[0]
if not pwd:
messagebox.showwarning("Prerequis", "Mot de passe PSMP requis pour les serveurs Production")
return
self.cyb_pwd = pwd
if hasattr(self, 'cybpwd_var'):
self.cybpwd_var.set(pwd)
self.prereq_btn.configure(state="disabled", text="Verification en cours...")
self.prereq_status.configure(text="")
self.audit("PREREQ_START", f"{len(selected)} serveurs (prod={has_prod})")
self._log_patch(f"Prerequis : {len(selected)} serveurs a verifier...", "info")
def worker():
results = {}
total = len(selected)
for idx, s in enumerate(selected):
server = s["server"]
env = s.get("env", "")
r = {"ssh": False, "disk_root": None, "disk_log": None, "satellite": None, "is_vm": server.lower().startswith("v")}
self.root.after(0, lambda sv=server, i=idx: self.prereq_status.configure(
text=f"[{i+1}/{total}] {sv}"))
self._log_patch(f"\n[{idx+1}/{total}] {server} ({env})", "info")
# 1. Check SSH
self._log_patch(f" SSH...", "info")
is_prod = "prod" in env.lower()
method = "PSMP" if is_prod else "cle SSH"
client, eff = ssh_connect(server, env, self.settings,
self.pkey, self.pkey2, self.cyb_pwd)
if not client:
r["ssh"] = False
r["error"] = "Connexion SSH impossible"
self._log_patch(f" SSH ({method}) ECHEC", "ko")
results[server] = r
continue
r["ssh"] = True
self._log_patch(f" SSH ({method}) OK : {eff}", "ok")
# 2. Check disque /
self._log_patch(f" Disque / ...", "info")
out, _ = run_cmd(client, "df -BM / | awk 'NR==2{print $4}' | tr -d 'M'", 10)
try:
r["disk_root"] = int(out.strip())
except (ValueError, AttributeError):
r["disk_root"] = -1
if r["disk_root"] >= 1200:
self._log_patch(f" Disque / : {r['disk_root']} Mo libre (OK >= 1200)", "ok")
else:
self._log_patch(f" Disque / : {r['disk_root']} Mo libre (KO < 1200)", "ko")
# Check disque /var/log
self._log_patch(f" Disque /var/log ...", "info")
out2, _ = run_cmd(client, "df -BM /var/log | awk 'NR==2{print $4}' | tr -d 'M'", 10)
try:
r["disk_log"] = int(out2.strip())
except (ValueError, AttributeError):
r["disk_log"] = -1
if r["disk_log"] >= 500:
self._log_patch(f" Disque /var/log : {r['disk_log']} Mo libre (OK >= 500)", "ok")
else:
self._log_patch(f" Disque /var/log : {r['disk_log']} Mo libre (KO < 500)", "ko")
# 3. Check Satellite
self._log_patch(f" Satellite ...", "info")
sat_out, _ = run_cmd(client, "subscription-manager status 2>/dev/null | head -5", 15)
if sat_out and ("current" in sat_out.lower() or "valide" in sat_out.lower()):
r["satellite"] = True
self._log_patch(f" Satellite : OK (enregistre)", "ok")
elif sat_out and "unknown" in sat_out.lower():
r["satellite"] = False
self._log_patch(f" Satellite : NON enregistre", "ko")
else:
r["satellite"] = None
self._log_patch(f" Satellite : N/A (subscription-manager absent)", "warn")
client.close()
results[server] = r
# Afficher les resultats dans l'UI
self.root.after(0, lambda: self._show_prereq_results(results, selected))
threading.Thread(target=worker, daemon=True).start()
def _show_prereq_results(self, results, selected):
self.prereq_btn.configure(state="normal", text="Verifier prerequis")
self._log_patch(f"\n{'='*50}", "info")
self._log_patch(f"RESUME PREREQUIS", "info")
self._log_patch(f"{'='*50}", "info")
# Construire le rapport
ok_servers = []
ko_servers = []
warn_servers = []
for s in selected:
server = s["server"]
r = results.get(server, {})
if not r.get("ssh"):
ko_servers.append((server, "SSH connexion impossible"))
s["prereq_status"] = "SSH_KO"
elif r.get("disk_root", 0) >= 0 and r["disk_root"] < 1200:
ko_servers.append((server, f"/ : {r['disk_root']} Mo libre (min 1200 Mo)"))
s["prereq_status"] = "DISK_KO"
elif r.get("disk_log", 0) >= 0 and r["disk_log"] < 500:
ko_servers.append((server, f"/var/log : {r['disk_log']} Mo libre (min 500 Mo)"))
s["prereq_status"] = "DISK_KO"
elif r.get("satellite") is False:
ko_servers.append((server, "Satellite : non enregistre"))
s["prereq_status"] = "SAT_KO"
else:
ok_servers.append(server)
s["prereq_status"] = "OK"
if r.get("satellite") is None:
warn_servers.append((server, "subscription-manager absent (Debian/physique ?)"))
# Log resume
for srv in ok_servers:
self._log_patch(f" OK : {srv}", "ok")
for srv, reason in warn_servers:
self._log_patch(f" WARN : {srv}{reason}", "warn")
for srv, reason in ko_servers:
self._log_patch(f" KO : {srv}{reason}", "ko")
self._log_patch(f"\nTotal : {len(ok_servers)} OK, {len(warn_servers)} WARN, {len(ko_servers)} KO", "info")
# Dialogue resultat
msg = f"Prerequis verifies : {len(selected)} serveurs\n\n"
msg += f"OK : {len(ok_servers)}\n"
if warn_servers:
msg += f"Avertissements : {len(warn_servers)}\n"
if ko_servers:
msg += f"BLOQUANTS : {len(ko_servers)}\n\n"
msg += "Serveurs bloques :\n"
for srv, reason in ko_servers:
msg += f" {srv} : {reason}\n"
msg += "\nDeselectionnez les serveurs bloques pour continuer."
if ko_servers:
# Deselectionner automatiquement les KO
for srv, _ in ko_servers:
for s in self.servers:
if s["server"] == srv:
s["selected"] = False
self._apply_filters()
messagebox.showwarning("Prerequis — problemes detectes", msg)
self.prereq_status.configure(text=f"{len(ko_servers)} serveurs bloques", fg=self.RED)
else:
messagebox.showinfo("Prerequis OK", msg)
self.prereq_status.configure(text=f"{len(ok_servers)} serveurs OK", fg=self.GREEN)
# Activer le bouton snapshot pour les VM
vm_count = sum(1 for s in selected if s.get("prereq_status") == "OK"
and s["server"].lower().startswith("v"))
if vm_count > 0:
self.snap_btn.configure(state="normal", bg="#e64553", fg="white")
self.audit("PREREQ_DONE", f"OK={len(ok_servers)} KO={len(ko_servers)} WARN={len(warn_servers)}")
# ==========================================================================
# SNAPSHOT — force sans snap
# ==========================================================================
def _force_patch_without_snap(self, server_name):
"""Demande une justification pour patcher une VM sans snapshot"""
dlg = tk.Toplevel(self.root)
dlg.title("Patching sans snapshot")
dlg.configure(bg="#1e1e2e")
dlg.resizable(False, False)
result = [None]
tk.Label(dlg, text=f"ATTENTION — VM sans snapshot", bg="#181825", fg="#f38ba8",
font=("Consolas", 12, "bold"), pady=8).pack(fill="x")
tk.Label(dlg, text=f"Serveur : {server_name}", bg="#1e1e2e", fg="#cdd6f4",
font=("Consolas", 11)).pack(pady=(10, 5))
tk.Label(dlg, text="Justification obligatoire :", bg="#1e1e2e", fg="#f9e2af",
font=("Consolas", 10)).pack(anchor="w", padx=20)
justif_var = tk.StringVar()
tk.Entry(dlg, textvariable=justif_var, bg="#2a2a3e", fg="#cdd6f4",
font=("Consolas", 10), insertbackground="#cdd6f4", width=50).pack(padx=20, pady=5)
def accept():
j = justif_var.get().strip()
if len(j) < 5:
messagebox.showwarning("Justification", "Minimum 5 caracteres")
return
result[0] = j
dlg.destroy()
bf = tk.Frame(dlg, bg="#1e1e2e")
bf.pack(pady=10)
tk.Button(bf, text="Forcer le patching", bg="#d20f39", fg="white",
font=("Consolas", 10, "bold"), command=accept).pack(side="left", padx=8)
tk.Button(bf, text="Annuler", bg="#313244", fg="#cdd6f4",
font=("Consolas", 10), command=dlg.destroy).pack(side="left", padx=8)
center_window(dlg, 500, 220, parent=self.root)
dlg.grab_set()
dlg.wait_window()
if result[0]:
self.audit("FORCE_PATCH_NO_SNAP", f"{server_name} : {result[0]}")
return result[0]
# ==========================================================================
# A PROPOS
# ==========================================================================
def _show_about(self):
dlg = tk.Toplevel(self.root)
dlg.title("A propos")
dlg.configure(bg="#1e1e2e")
dlg.resizable(False, False)
tk.Frame(dlg, bg="#e94560", height=3).pack(fill="x")
tk.Label(dlg, text="SANEF Linux Patch Manager",
bg="#1e1e2e", fg="#89b4fa", font=("Consolas", 16, "bold")).pack(pady=(20, 4))
tk.Label(dlg, text=f"Version {VERSION}",
bg="#1e1e2e", fg="#cdd6f4", font=("Consolas", 12)).pack()
tk.Frame(dlg, bg="#313244", height=1).pack(fill="x", padx=30, pady=12)
tk.Label(dlg, text="SANEF DSI — Securite Operationnelle",
bg="#1e1e2e", fg="#cdd6f4", font=("Consolas", 11)).pack()
tk.Label(dlg, text="(KM)",
bg="#1e1e2e", fg="#a6e3a1", font=("Consolas", 11)).pack(pady=(4, 0))
tk.Label(dlg, text="SECOPS 2026",
bg="#1e1e2e", fg="#6c7086", font=("Consolas", 10)).pack(pady=(2, 0))
tk.Frame(dlg, bg="#313244", height=1).pack(fill="x", padx=30, pady=12)
tk.Label(dlg, text="Orchestration du patching Linux\nvia Excel + SSH + vSphere",
bg="#1e1e2e", fg="#6c7086", font=("Consolas", 9), justify="center").pack()
tk.Button(dlg, text="Fermer", bg="#313244", fg="#cdd6f4",
font=("Consolas", 10), padx=20, pady=4,
command=dlg.destroy).pack(pady=16)
center_window(dlg, 380, 320, parent=self.root)
dlg.grab_set()
# ==========================================================================
# SETTINGS
# ==========================================================================
def _open_settings(self):
dlg = SettingsDialog(self.root, self.settings)
if dlg.result:
self.settings = dlg.result
self._reload_keys()
def _reload_keys(self):
self.pkey = load_key(self.settings.get("keyfile",""))
self.pkey2 = load_key(self.settings.get("keyfile2",""))
# ==========================================================================
# ONGLET 1 — LOGIQUE
# ==========================================================================
def _browse_excel(self):
path = filedialog.askopenfilename(
filetypes=[("Excel","*.xlsx *.xls"),("All","*.*")])
if path:
self.excel_var.set(path)
self.settings["excel_file"] = path
save_settings(self.settings)
self._load_excel()
def _prev_week(self):
sheets = list(self.sheet_cb["values"])
if not sheets: return
cur = self.sheet_var.get()
idx = sheets.index(cur) if cur in sheets else 0
if idx > 0:
self.sheet_var.set(sheets[idx-1])
self._load_sheet()
def _next_week(self):
sheets = list(self.sheet_cb["values"])
if not sheets: return
cur = self.sheet_var.get()
idx = sheets.index(cur) if cur in sheets else 0
if idx < len(sheets)-1:
self.sheet_var.set(sheets[idx+1])
self._load_sheet()
def _goto_current_week(self):
week_num = date.today().isocalendar()[1]
sheets = list(self.sheet_cb["values"])
for s in [f"S{week_num}", f"S{week_num:02d}"]:
if s in sheets:
self.sheet_var.set(s)
self._load_sheet()
return
def _load_excel(self):
path = self.excel_var.get().strip()
if not path or not os.path.exists(path):
messagebox.showerror("Erreur", f"Fichier introuvable :\n{path}")
return
self.audit("LOAD_EXCEL", path)
try:
import io
try:
with open(path, "rb") as f:
data = io.BytesIO(f.read())
wb = openpyxl.load_workbook(data, read_only=True, data_only=True)
except Exception:
wb = openpyxl.load_workbook(path, read_only=True, data_only=True)
sheets = wb.sheetnames
wb.close()
# Mettre à jour le combobox feuilles
self.sheet_cb["values"] = sheets
week_sheet = None
week_num = date.today().isocalendar()[1]
for s in sheets:
if str(week_num) in s:
week_sheet = s
break
self.sheet_var.set(week_sheet or (sheets[0] if sheets else ""))
self._load_sheet()
except Exception as e:
messagebox.showerror("Erreur Excel", f"Impossible de modifier l'Excel :\n{e}")
def _load_sheet(self):
path = self.excel_var.get().strip()
sheet = self.sheet_var.get()
if not path or not sheet:
return
servers, _ = read_excel_servers(path, sheet)
if servers is None:
messagebox.showerror("Erreur", "Aucun serveur Linux trouvé")
return
# Détecter si c'est la semaine courante
week_num = date.today().isocalendar()[1]
self.is_current_week = any(str(week_num) in sheet for sheet in [sheet])
# Mettre à jour l'indicateur visuel
if self.is_current_week:
self.week_mode_lbl.configure(
text="✅ SEMAINE COURANTE — Patch autorisé",
fg=self.GREEN)
else:
self.week_mode_lbl.configure(
text=f"👁 VUE SEULE ({sheet}) — Patch désactivé",
fg=self.YELLOW)
self.servers = servers
# Peupler les filtres
envs = sorted(set(s["env"] for s in servers if s["env"]))
domains = sorted(set(s["domain"] for s in servers if s["domain"]))
apps = sorted(set(s["app"] for s in servers if s["app"]))
patchers = sorted(set(s["patcher"].strip() for s in servers if s.get("patcher") and s["patcher"].strip()))
self.filter_env_cb["values"] = ["Tous"] + envs
self.filter_domain_cb["values"] = ["Tous"] + domains
self.filter_app_cb["values"] = ["Tous"] + apps
if self.filter_patcher_cb:
self.filter_patcher_cb["values"] = ["Tous"] + patchers
self.filter_env_var.set("Tous")
self.filter_domain_var.set("Tous")
self.filter_app_var.set("Tous")
# Auto-selectionner le patcheur
if self.current_user["role"] == "admin":
patcher_setting = self.settings.get("patcher", "").strip()
if patcher_setting and patcher_setting in patchers:
self.filter_patcher_var.set(patcher_setting)
else:
self.filter_patcher_var.set("Tous")
else:
self.filter_patcher_var.set("Tous") # Non-admin filtre par _apply_filters
self._apply_filters()
# Griser/activer le bouton GO selon la semaine
if hasattr(self, "go_btn"):
if self.is_current_week:
self.go_btn.configure(bg="#d20f39", state="normal",
text="🚀 GO PATCH")
else:
self.go_btn.configure(bg="#45475a", state="normal",
text="👁 VUE SEULE — Patch désactivé")
# Vérifier snapshots si des serveurs ont accord=oui
accord_servers = [s for s in self.servers
if s.get("accord") == "oui" and not s.get("is_physical")]
if accord_servers and VSPHERE_OK:
self.root.after(200, self._prompt_vcenter_and_check_snaps)
def _apply_filters(self):
env_f = self.filter_env_var.get()
domain_f = self.filter_domain_var.get()
app_f = self.filter_app_var.get()
patcher_f = self.filter_patcher_var.get()
today_f = self.filter_today_var.get()
today = date.today()
# Non-admin : ne voit que ses serveurs (intervenant = son username)
is_admin = self.current_user["role"] == "admin"
my_name = self.current_user["username"].lower()
filtered = []
for s in self.servers:
# Filtre par utilisateur connecte (non-admin)
if not is_admin:
srv_patcher = (s.get("patcher", "") or "").strip().lower()
if srv_patcher != my_name:
continue
if patcher_f != "Tous":
srv_patcher = (s.get("patcher", "") or "").strip()
patcher_cmp = patcher_f.strip()
if srv_patcher and srv_patcher.lower() != patcher_cmp.lower():
continue
if env_f != "Tous" and s["env"] != env_f: continue
if domain_f != "Tous" and s["domain"] != domain_f: continue
if app_f != "Tous" and s["app"] != app_f: continue
if today_f and s["date_patch"] and s["date_patch"] != today: continue
filtered.append(s)
self._populate_tree(filtered)
def _populate_tree(self, servers):
for item in self.srv_tree.get_children():
self.srv_tree.delete(item)
# Compteur pour gérer les doublons de nom de serveur dans la feuille
seen = {}
for s in servers:
accord_ok = s["accord"] == "oui"
sel_mark = "" if s.get("selected") and accord_ok else ("" if accord_ok else "")
accord_lbl= "✅ OUI" if accord_ok else "❌ NON"
snap_lbl = "✅ OK" if s.get("snap_done") else ("🖥 PHY" if s.get("is_physical") else "")
date_str = s["date_patch"].strftime("%d/%m/%Y") if s.get("date_patch") else ""
if s.get("already_patched"):
tag = "already_patched"
elif not accord_ok:
tag = "no_accord"
elif s.get("selected"):
tag = "selected"
else:
tag = "normal"
# Générer un iid unique si le nom de serveur apparaît en double
base_iid = s["server"]
seen[base_iid] = seen.get(base_iid, 0) + 1
iid = base_iid if seen[base_iid] == 1 else f"{base_iid}_#{seen[base_iid]}"
s["_iid"] = iid # stocker pour _refresh_tree_item
self.srv_tree.insert("", "end",
iid=iid,
values=(sel_mark, accord_lbl, s["server"],
s.get("patcher",""), s["env"],
s["domain"], s["app"], date_str, snap_lbl, s["status"]),
tags=(tag,))
n = sum(1 for s in servers if s.get("selected") and s["accord"]=="oui")
self.count_lbl.configure(text=f"{len(servers)} serveurs affichés — {n} sélectionnés")
def _get_server(self, name):
for s in self.servers:
if s["server"] == name:
return s
return None
def _get_server_by_iid(self, iid):
"""Retrouver un serveur par son iid treeview (gère les doublons de noms)."""
for s in self.servers:
if s.get("_iid") == iid:
return s
# Fallback : chercher par nom (compatibilité)
return self._get_server(iid)
def _toggle_srv(self, event):
region = self.srv_tree.identify_region(event.x, event.y)
if region != "cell": return
item = self.srv_tree.identify_row(event.y)
if not item: return
# Retrouver le serveur par son iid stocké (gère les doublons)
s = self._get_server_by_iid(item)
if not s or s["accord"] != "oui": return # accord obligatoire
s["selected"] = not s.get("selected", False)
self._refresh_tree_item(s)
n = sum(1 for x in self.servers if x.get("selected") and x["accord"]=="oui")
self.count_lbl.configure(text=f"{n} sélectionnés")
def _refresh_tree_item(self, s):
# Chercher l'iid : d'abord _iid stocké, sinon nom serveur, sinon chercher dans l'arbre
iid = s.get("_iid", s["server"])
if not self.srv_tree.exists(iid):
# Fallback : chercher parmi tous les items de l'arbre par valeur "server"
for item in self.srv_tree.get_children():
vals = self.srv_tree.item(item, "values")
if vals and len(vals) > 2 and vals[2] == s["server"]:
iid = item
s["_iid"] = iid # mémoriser pour la prochaine fois
break
else:
return # serveur pas dans l'arbre (filtré)
accord_ok = s["accord"] == "oui"
sel_mark = "" if s.get("selected") else ""
snap_lbl = "✅ OK" if s.get("snap_done") else ("🖥 PHY" if s.get("is_physical") else "")
date_str = s["date_patch"].strftime("%d/%m/%Y") if s.get("date_patch") else ""
tag = "selected" if s.get("selected") else "normal"
self.srv_tree.item(iid, values=(
sel_mark, "✅ OUI" if accord_ok else "❌ NON",
s["server"], s.get("patcher",""), s["env"], s["domain"], s["app"],
date_str, snap_lbl, s["status"]), tags=(tag,))
def _select_all_ok(self):
for s in self.servers:
if s["accord"] == "oui" and self.srv_tree.exists(s["server"]):
s["selected"] = True
self._refresh_tree_item(s)
n = sum(1 for s in self.servers if s.get("selected"))
self.count_lbl.configure(text=f"{n} sélectionnés")
def _select_none(self):
for s in self.servers:
s["selected"] = False
if self.srv_tree.exists(s["server"]):
self._refresh_tree_item(s)
self.count_lbl.configure(text="— 0 sélectionnés")
def _invert(self):
for s in self.servers:
if s["accord"] == "oui" and self.srv_tree.exists(s["server"]):
s["selected"] = not s.get("selected", False)
self._refresh_tree_item(s)
n = sum(1 for s in self.servers if s.get("selected"))
self.count_lbl.configure(text=f"{n} sélectionnés")
# ==========================================================================
# SNAPSHOT VSPHERE
# ==========================================================================
def _prompt_vcenter_and_check_snaps(self):
"""Prompt vCenter puis vérifie les snapshots des serveurs sélectionnés avec accord."""
accord_servers = [s for s in self.servers
if s.get("selected") and s.get("accord") == "oui" and not s.get("is_physical")]
if not accord_servers:
return
# Si identifiants déjà en settings, utiliser directement
vs_user = self.settings.get("vs_user", "").strip()
vs_pwd = self.settings.get("_vs_pwd_session", "").strip()
if vs_user and vs_pwd:
self._log_patch(
f"Vérification snapshots ({len(accord_servers)} serveurs avec accord)...", "info")
threading.Thread(target=self._check_snap_worker,
args=(accord_servers, vs_user, vs_pwd), daemon=True).start()
return
# Sinon : prompt identifiants
dlg = tk.Toplevel(self.root)
dlg.title("vSphere — Vérification snapshots")
dlg.configure(bg=self.BG)
dlg.grab_set()
dlg.resizable(False, False)
self._center_dialog(dlg, 400, 230)
tk.Label(dlg, text="Connexion vCenter", bg=self.BG, fg=self.ACCENT,
font=("Consolas",12,"bold")).pack(pady=10)
msg_lbl = f"Vérification snapshots pour {len(accord_servers)} serveur(s) avec accord."
tk.Label(dlg, text=msg_lbl, bg=self.BG, fg=self.FG,
font=("Consolas",9), justify="center").pack(pady=(0,8))
for lbl, attr, show in [
("Utilisateur :", "dlg_vs_user", ""),
("Mot de passe :", "dlg_vs_pwd", "")
]:
f = tk.Frame(dlg, bg=self.BG); f.pack(fill="x", padx=24, pady=4)
tk.Label(f, text=lbl, bg=self.BG, fg=self.FG,
font=("Consolas",10), width=14).pack(side="left")
var = tk.StringVar(value=self.settings.get("vs_user", "") if "Util" in lbl else "")
setattr(self, attr, var)
tk.Entry(f, textvariable=var, show=show, bg=self.BG2, fg=self.FG,
font=("Consolas",10), insertbackground=self.FG,
width=26).pack(side="left", padx=4)
def do_check():
user = self.dlg_vs_user.get().strip()
pwd = self.dlg_vs_pwd.get().strip()
if not user or not pwd:
messagebox.showwarning("Attention", "Identifiants requis.", parent=dlg)
return
# Sauvegarder en settings pour ne pas re-demander
self.settings["vs_user"] = user
self.settings["_vs_pwd_session"] = pwd
save_settings(self.settings)
dlg.destroy()
self._log_patch(
f"Vérification snapshots ({len(accord_servers)} serveurs avec accord)...", "info")
threading.Thread(target=self._check_snap_worker,
args=(accord_servers, user, pwd), daemon=True).start()
def skip():
dlg.destroy()
self._log_patch("Vérification snapshot ignorée.", "warn")
bf = tk.Frame(dlg, bg=self.BG); bf.pack(pady=10)
tk.Button(bf, text="🔍 Vérifier", bg=self.ACCENT, fg="white",
font=("Consolas",11,"bold"), padx=10, pady=4,
command=do_check).pack(side="left", padx=6)
tk.Button(bf, text="Ignorer", bg=self.BG2, fg=self.FG,
font=("Consolas",10), padx=10, pady=4,
command=skip).pack(side="left", padx=6)
dlg.bind("<Return>", lambda e: do_check())
def _check_snap_worker(self, servers, vs_user, vs_pwd):
"""Thread : vérifie l'existence d'un snapshot PrePatch sur les vCenters."""
snap_prefix = "PrePatch_"
def has_prepatch_snap(vm):
"""Retourne True si la VM a un snapshot dont le nom commence par PrePatch_"""
if not vm.snapshot or not vm.snapshot.rootSnapshotList:
return False
def walk(snap_list):
for snap in snap_list:
if snap.name.startswith(snap_prefix):
return True
if snap.childSnapshotList and walk(snap.childSnapshotList):
return True
return False
return walk(vm.snapshot.rootSnapshotList)
for vc in VSPHERE_HOSTS:
# Serveurs pas encore trouvés
pending = [s for s in servers if not s.get("snap_done")]
if not pending:
break
try:
import ssl
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
si = SmartConnect(host=vc, user=vs_user, pwd=vs_pwd, sslContext=ctx)
vc_content = si.RetrieveContent()
for s in pending:
vm = self._find_vm(vc_content, s["server"])
if not vm:
continue
if has_prepatch_snap(vm):
s["snap_done"] = True
self._log_patch(f" ✅ Snapshot existant : {s['server']}", "ok")
self.root.after(0, lambda sv=s: self._refresh_tree_item(sv))
else:
self._log_patch(f" ⚠ Pas de snapshot : {s['server']}", "warn")
Disconnect(si)
except Exception as e:
self._log_patch(f" Erreur vCenter {vc} : {e}", "ko")
# Bilan
ok = sum(1 for s in servers if s.get("snap_done"))
ko = len(servers) - ok
self._log_patch(
f"Snapshots : {ok} trouvé(s), {ko} manquant(s) sur {len(servers)} serveur(s) avec accord",
"ok" if ko == 0 else "warn")
def _center_dialog(self, dlg, w, h):
"""Centre un dialog sur le meme ecran que la fenetre principale."""
center_window(dlg, w, h, parent=self.root)
def _take_snapshots(self):
selected = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
if not selected:
messagebox.showwarning("Attention", "Aucun serveur sélectionné (avec accord).")
return
if not VSPHERE_OK:
messagebox.showwarning("pyVmomi manquant",
"pyVmomi non installé.\n\n"
"Installer avec :\npy -m pip install pyVmomi --proxy http://proxy.sanef.fr:8080\n\n"
"Après installation, relancer l'application.")
return
# Prompt vSphere
dlg = tk.Toplevel(self.root)
dlg.title("Identifiants vSphere")
dlg.configure(bg=self.BG)
dlg.grab_set()
dlg.resizable(False, False)
tk.Label(dlg, text="Connexion vSphere", bg=self.BG, fg=self.ACCENT,
font=("Consolas",12,"bold")).pack(pady=10)
for lbl, attr, show, default_key in [
("Utilisateur :", "vs_user", "", "vs_user"),
("Mot de passe :", "vs_pwd", "\u25cf", "")
]:
f = tk.Frame(dlg, bg=self.BG); f.pack(fill="x", padx=20, pady=4)
tk.Label(f, text=lbl, bg=self.BG, fg=self.FG,
font=("Consolas",10), width=14).pack(side="left")
var = tk.StringVar(value=self.settings.get(default_key, ""))
setattr(self, attr, var)
tk.Entry(f, textvariable=var, show=show, bg=self.BG2, fg=self.FG,
font=("Consolas",10), insertbackground=self.FG).pack(side="left", fill="x", expand=True)
center_window(dlg, 400, 200, parent=self.root)
def do_snap():
user = self.vs_user.get().strip()
pwd = self.vs_pwd.get().strip()
dlg.destroy()
threading.Thread(target=self._snap_worker,
args=(selected, user, pwd), daemon=True).start()
tk.Button(dlg, text="📸 Prendre snapshots", bg="#e64553", fg="white",
font=("Consolas",11,"bold"), pady=6,
command=do_snap).pack(pady=10)
def _snap_worker(self, servers, vs_user, vs_pwd):
snap_name = f"PrePatch_{date.today():%Y%m%d}"
# Travailler sur les serveurs pas encore snapshotés
pending = [s for s in servers if not s.get("snap_done")]
for vc in VSPHERE_HOSTS:
# Si tous les serveurs sont déjà snapshotés, inutile de continuer
pending = [s for s in pending if not s.get("snap_done")]
if not pending:
break
self._log_patch(f"Connexion vCenter : {vc}", "info")
try:
import ssl
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
si = SmartConnect(host=vc, user=vs_user, pwd=vs_pwd,
sslContext=ctx)
content = si.RetrieveContent()
for s in pending:
if s.get("snap_done"):
continue # déjà fait sur un vCenter précédent
vm = self._find_vm(content, s["server"])
if vm:
try:
task = vm.CreateSnapshot_Task(
name=snap_name,
description=f"Pre-patch automatique {date.today()}",
memory=False, quiesce=False)
while task.info.state not in ("success","error"):
time.sleep(1)
if task.info.state == "success":
s["snap_done"] = True
self._log_patch(f" ✅ Snapshot OK : {s['server']}", "ok")
# Rafraîchir l'UI immédiatement (ne pas attendre la fin)
self.root.after(0, lambda sv=s: self._refresh_tree_item(sv))
else:
self._log_patch(f" ❌ Snapshot KO : {s['server']} ({task.info.error})", "ko")
except Exception as e:
self._log_patch(f"{s['server']} : {e}", "ko")
# Ne pas logguer "VM non trouvée" ici — la VM peut être sur un autre vCenter
Disconnect(si)
except Exception as e:
self._log_patch(f" Erreur vCenter {vc} : {e}", "ko")
# Bilan final
ok = [s for s in servers if s.get("snap_done")]
ko = [s for s in servers if not s.get("snap_done") and not s.get("is_physical")]
for s in ko:
self._log_patch(f" ⚠ VM non trouvée sur aucun vCenter : {s['server']}", "warn")
self._log_patch(
f"Snapshots terminés : {len(ok)}/{len(servers)} OK", "ok" if not ko else "warn")
# Rafraîchir l'arbre complet à la fin
self.root.after(0, lambda: [self._refresh_tree_item(s) for s in servers])
def _find_vm(self, content, name):
"""Cherche une VM par nom exact ou partiel dans tout le vCenter"""
name_short = name.split(".")[0].lower() # Sans le domaine
def search_folder(folder):
try:
for obj in folder.childEntity:
if hasattr(obj, "childEntity"):
result = search_folder(obj)
if result:
return result
if hasattr(obj, "name"):
vm_name = obj.name.lower()
if vm_name == name.lower() or vm_name == name_short:
return obj
if name_short in vm_name or vm_name in name_short:
return obj
except Exception:
pass
return None
try:
for dc in content.rootFolder.childEntity:
result = search_folder(dc.vmFolder)
if result:
return result
except Exception:
pass
return None
# ==========================================================================
# ONGLET 2 — LOGIQUE PATCH
# ==========================================================================
def _on_preset(self, event=None):
val = PRESET_PACKAGES.get(self.preset_var.get(), "")
if val == "CUSTOM":
# Mode personnalise — activer les champs
self.packages_entry.configure(state="normal")
self.custom_excludes_entry.configure(state="normal")
self.packages_var.set("")
self.custom_excludes_var.set("")
else:
self.packages_var.set(val)
self.custom_excludes_var.set("")
self._preview_cmd()
def _preview_cmd(self):
"""Genere la commande yum et l'affiche dans le champ editable"""
preset = self.preset_var.get()
packages = self.packages_var.get().strip()
custom_excl = self.custom_excludes_var.get().strip()
exclude_kernel = self.exclude_kernel_var.get()
dryrun = self.dryrun_var.get()
if preset == "Personnalise..." or custom_excl:
# Mode personnalise
if packages and not custom_excl:
# Packages specifiques a patcher uniquement
if dryrun:
cmd = f"sudo yum check-update {packages} 2>/dev/null | head -30"
else:
cmd = f"sudo yum update -y {packages} 2>&1 | tail -25"
elif custom_excl:
# Excludes personnalises
excl_parts = " ".join(f"--exclude={e.strip()}" for e in custom_excl.split() if e.strip())
if exclude_kernel and "--exclude=kernel" not in excl_parts.lower():
excl_parts += " --exclude=*kernel*"
if packages:
if dryrun:
cmd = f"sudo yum check-update {packages} {excl_parts} 2>/dev/null | head -30"
else:
cmd = f"sudo yum update -y {packages} {excl_parts} 2>&1 | tail -25"
else:
if dryrun:
cmd = f"sudo yum check-update {excl_parts} 2>/dev/null | head -30"
else:
cmd = f"sudo yum update -y {excl_parts} 2>&1 | tail -25"
else:
cmd = "sudo yum check-update 2>/dev/null | head -30" if dryrun else "sudo yum update -y 2>&1 | tail -25"
else:
# Mode preset standard — utilise build_yum_command
cmd = build_yum_command("standard", packages, exclude_kernel, dryrun)
cmd = "sudo " + cmd if not cmd.startswith("sudo") else cmd
self.final_cmd_var.set(cmd)
def _go_patch(self):
# Bloquer si pas la semaine courante
if not self.is_current_week:
messagebox.showwarning(
"Patch desactive",
"Vous consultez une autre semaine que la semaine courante.\n\n"
"Le patch n'est autorise que sur la semaine en cours.\n"
"Utilisez le bouton ↺ pour revenir a la semaine courante.")
return
selected = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
if not selected:
messagebox.showwarning("Attention", "Aucun serveur sélectionné avec accord.")
return
# Vérifier snapshot pour les VMs
no_snap = [s for s in selected if not s.get("snap_done") and not s.get("is_physical")]
if no_snap:
names = "\n".join(s["server"] for s in no_snap[:8])
ans = messagebox.askyesno(
"⚠️ Snapshot manquant",
f"{len(no_snap)} serveur(s) sans snapshot :\n{names}\n\n"
"Continuer SANS snapshot ?",
icon="warning")
if not ans:
return
if not self.dryrun_var.get():
if not messagebox.askyesno(
"Confirmation PATCH RÉEL",
f"Lancer le PATCH RÉEL sur {len(selected)} serveur(s) ?\n\n"
"Cette action est irréversible !",
icon="warning"):
return
self.cyb_pwd = self.cybpwd_var.get().strip() or None
self.running = True
self.apply_all_cmd = False
self.results = []
self.go_btn.configure(state="disabled")
self.stop_btn.configure(state="normal")
threading.Thread(
target=self._patch_worker,
args=(selected,),
daemon=True).start()
def _stop(self):
self.running = False
self._log_patch("⏹ Arrêt demandé...", "warn")
def _patch_worker(self, servers):
total = len(servers)
packages = self.packages_var.get().strip()
exclude_kernel = self.exclude_kernel_var.get()
dryrun = self.dryrun_var.get()
approved_cmd = None # commande approuvée pour "appliquer à tous"
for idx, s in enumerate(servers):
if not self.running:
break
server = s["server"]
self._log_patch(f"\n[{idx+1}/{total}] ── {server} ──", "info")
# Construire la commande yum
yum_cmd = build_yum_command(s["domain"], packages, exclude_kernel, dryrun)
# Confirmation commande (sauf si "appliquer à tous" déjà validé)
if not self.apply_all_cmd:
event = threading.Event()
dialog_result = [None]
def show_dialog(cmd=yum_cmd, srv=server, ev=event, res=dialog_result):
dlg = YumConfirmDialog(self.root, srv, cmd)
res[0] = dlg.result
if dlg.result in ("apply","apply_all"):
yum_cmd_final = dlg.final_cmd
res.append(yum_cmd_final)
ev.set()
self.root.after(0, show_dialog)
event.wait()
action = dialog_result[0]
if action == "cancel":
self._log_patch(f" ↩ Serveur ignoré par l'utilisateur", "warn")
s["status"] = "IGNORE"
self._update_tree_status(s)
continue
elif action == "apply_all":
self.apply_all_cmd = True
if len(dialog_result) > 1:
yum_cmd = dialog_result[1]
approved_cmd = yum_cmd
elif action == "apply":
if len(dialog_result) > 1:
yum_cmd = dialog_result[1]
else:
if approved_cmd:
yum_cmd = approved_cmd
self._log_patch(f" Connexion...", "info")
t0 = time.time()
client, eff_host = ssh_connect(
server, s["env"], self.settings,
self.pkey, self.pkey2, self.cyb_pwd)
if client is None:
dur = round(time.time() - t0, 1)
self._log_patch(f" ❌ Connexion impossible ({dur}s)", "ko")
s["status"] = "AUTH_KO"
self.results.append({**s, "duration": f"{dur}s",
"patch_status":"KO","patch_detail":"Connexion impossible"})
self._update_tree_status(s)
self._update_progress(idx+1, total)
continue
self._log_patch(f" ✅ Connecté : {eff_host}", "ok")
# Détection physique
s["is_physical"] = detect_physical(client)
if s["is_physical"]:
self._log_patch(f" 🖥 Serveur physique détecté", "warn")
# Pré-patching
self._log_patch(f" Pre-patching...", "info")
pre_out, _ = run_cmd(client, PRE_PATCH_SCRIPT, 30)
if "PRE_PATCH_OK" in pre_out:
self._log_patch(f" Pre-patch OK", "ok")
else:
self._log_patch(f" Pre-patch incomplet", "warn")
# Flux Libre — snapshot pods Podman
is_fl = "flux libre" in s.get("domain", "").lower()
if is_fl:
self._log_patch(f" Flux Libre : snapshot pods Podman...", "info")
fl_out, _ = run_cmd(client, FL_PRE_PATCH_SCRIPT, 30)
if "FL_PRE_OK" in fl_out:
for line in fl_out.splitlines():
if line.startswith("FL_USER=") or line.startswith("FL_PODS_SAVED="):
self._log_patch(f" {line}", "ok")
elif "FL_NO_PODS" in fl_out:
self._log_patch(f" Pas de pods Podman detectes", "info")
else:
self._log_patch(f" Snapshot pods incomplet", "warn")
# Snapshot versions avant
ver_before = {}
if packages:
for pkg in packages.split():
out, _ = run_cmd(client, f"rpm -q {pkg} 2>/dev/null || echo NOT_INSTALLED", 15)
ver_before[pkg] = out.strip()
# Exécution yum
self._log_patch(f" 🔧 Commande : {yum_cmd[:80]}...", "cmd")
yum_out, yum_err = run_cmd(client, yum_cmd, 600)
if yum_out:
for line in yum_out.splitlines()[-10:]:
self._log_patch(f" {line}", "info")
# Snapshot versions après
ver_after = {}
if packages:
for pkg in packages.split():
out, _ = run_cmd(client, f"rpm -q {pkg} 2>/dev/null || echo NOT_INSTALLED", 15)
ver_after[pkg] = out.strip()
dur = round(time.time() - t0, 1)
# Déterminer statut
if dryrun:
pkg_lines = [l for l in yum_out.splitlines()
if l.split() and "." in l.split()[0]
and not any(x in l for x in ["kB","bps","00:","Updat"])]
if pkg_lines:
s["patch_status"] = "UPDATE_AVAIL"
s["patch_detail"] = " | ".join(pkg_lines)[:200]
self._log_patch(f" ⚠ Updates disponibles : {len(pkg_lines)} package(s)", "warn")
else:
s["patch_status"] = "UP_TO_DATE"
s["patch_detail"] = "Deja a jour"
self._log_patch(f" ✅ A jour", "ok")
else:
updated = [p for p in (packages.split() if packages else [])
if ver_before.get(p) != ver_after.get(p)]
if updated or "Complete!" in yum_out:
s["patch_status"] = "PATCHED"
s["patch_detail"] = " | ".join(
f"{p}: {ver_before.get(p,'?')} -> {ver_after.get(p,'?')}"
for p in updated)[:200] if updated else "Packages mis a jour"
self._log_patch(f" ✅ PATCHÉ : {s['patch_detail'][:80]}", "ok")
elif "Nothing to do" in yum_out or "No packages" in yum_out:
s["patch_status"] = "UP_TO_DATE"
s["patch_detail"] = "Deja a jour"
self._log_patch(f" ✅ Déjà à jour", "ok")
else:
s["patch_status"] = "UP_TO_DATE"
s["patch_detail"] = "Deja a jour"
# Vérifier reboot nécessaire
reboot_out, _ = run_cmd(client, "needs-restarting -r 2>/dev/null; echo RC:$?", 15)
if "RC:1" in reboot_out:
s["reboot_required"] = True
self._log_patch(f" ⚠ REBOOT REQUIS", "warn")
else:
s["reboot_required"] = False
s["status"] = "OK"
s["duration"] = f"{dur}s"
client.close()
self.results.append(deepcopy(s))
self._update_tree_status(s)
self._update_progress(idx+1, total)
# Fin
self.running = False
self.root.after(0, lambda: [
self.go_btn.configure(state="normal"),
self.stop_btn.configure(state="disabled"),
self._patch_done_summary()
])
def _update_tree_status(self, s):
def _do():
if not self.srv_tree.exists(s["server"]): return
vals = list(self.srv_tree.item(s["server"], "values"))
vals[9] = s.get("status", "") # statut est maintenant col 9
tag = ("ok" if s.get("status")=="OK" and s.get("patch_status") not in ("KO",)
else "ko" if s.get("status")=="AUTH_KO" else "normal")
self.srv_tree.item(s["server"], values=vals, tags=(tag,))
self.root.after(0, _do)
def _update_progress(self, done, total):
pct = (done/total)*100
self.root.after(0, lambda: [
self.prog_var.set(pct),
self.prog_lbl.configure(text=f"{done}/{total} ({pct:.0f}%)")])
def _patch_done_summary(self):
ok_c = sum(1 for r in self.results if r.get("status")=="OK")
p_c = sum(1 for r in self.results if r.get("patch_status")=="PATCHED")
rb_c = sum(1 for r in self.results if r.get("reboot_required"))
ko_c = sum(1 for r in self.results if r.get("status")!="OK")
msg = (f"Patch termine !\n\n"
f"Total traite : {len(self.results)}\n"
f"OK : {ok_c}\n"
f"Patche : {p_c}\n"
f"Reboot requis: {rb_c}\n"
f"KO/Auth : {ko_c}")
messagebox.showinfo("Patch termine", msg)
if rb_c > 0:
messagebox.showwarning("Reboot requis",
f"{rb_c} serveur(s) necessitent un reboot.\n"
"Utiliser l'onglet Post-patching > Reboot.")
# Auto-générer le rapport dans l'onglet 4
self.root.after(500, self._generate_report)
# Basculer sur l'onglet rapport
self.root.after(600, lambda: self.nb.select(self.tab_report))
# ==========================================================================
# ONGLET 3 — POST-PATCHING
# ==========================================================================
def _post_patch_check(self):
targets = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
if not targets:
messagebox.showwarning("Attention", "Aucun serveur sélectionné.")
return
threading.Thread(target=self._post_worker, args=(targets,), daemon=True).start()
def _post_worker(self, servers):
for s in servers:
self._log_post(f"\n── {s['server']} ──", "info")
client, eff = ssh_connect(s["server"], s["env"], self.settings,
self.pkey, self.pkey2, self.cyb_pwd)
if not client:
self._log_post(f" ❌ Connexion impossible", "ko")
continue
out, _ = run_cmd(client, POST_PATCH_SCRIPT, 60)
for line in out.splitlines():
if "ALERTE" in line or "disparu" in line.lower():
self._log_post(f" {line}", "ko")
elif "WARN" in line or "nouveau" in line.lower():
self._log_post(f" {line}", "warn")
elif "OK" in line:
self._log_post(f" {line}", "ok")
else:
self._log_post(f" {line}", "info")
# Flux Libre — verifier et redemarrer les pods
is_fl = "flux libre" in s.get("domain", "").lower()
if is_fl:
self._log_post(f" Flux Libre : verification pods Podman...", "info")
fl_out, _ = run_cmd(client, FL_POST_PATCH_SCRIPT, 60)
for line in fl_out.splitlines():
if "FL_RESTART=" in line:
pod = line.split("=", 1)[1]
self._log_post(f" Redemarrage pod: {pod}", "warn")
elif "FL_ALL_OK" in line:
self._log_post(f" Tous les pods sont actifs", "ok")
elif "FL_RESTARTED=" in line:
n = line.split("=", 1)[1]
self._log_post(f" {n} pod(s) redemarres", "warn")
elif "FL_USER=" in line:
self._log_post(f" {line}", "info")
elif "FL_NO_PODS" in line:
self._log_post(f" Pas de pods Podman", "info")
client.close()
def _ask_mark_patched(self):
"""Demande confirmation puis marque les serveurs patchés en vert dans Excel"""
patched = [s for s in self.servers
if s.get("patch_status") == "PATCHED" and s.get("selected")]
if not patched:
patched = [r for r in self.results if r.get("patch_status") == "PATCHED"]
if not patched:
messagebox.showwarning("Attention",
"Aucun serveur patche detecte.\n"
"Selectionner manuellement les serveurs a marquer.")
patched = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
if not patched:
return
names = "\n".join(s["server"] for s in patched[:10])
if len(patched) > 10:
names += f"\n... et {len(patched)-10} autres"
if messagebox.askyesno(
"Marquer comme patches dans Excel",
f"Marquer {len(patched)} serveur(s) en VERT dans le fichier Excel ?\n\n{names}\n\nCette action modifie le fichier Excel original.",
icon="question"):
self._mark_patched_excel(patched)
def _delete_old_snapshots(self):
"""Supprime les snapshots de moins de 3 jours sur les serveurs sélectionnés"""
if not VSPHERE_OK:
messagebox.showwarning("pyVmomi manquant",
"pyVmomi non installe.\npy -m pip install pyVmomi --proxy http://proxy.sanef.fr:8080")
return
selected = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
if not selected:
messagebox.showwarning("Attention", "Aucun serveur selectionne.")
return
# Prompt vSphere
dlg = tk.Toplevel(self.root)
dlg.title("Supprimer snapshots vSphere")
dlg.geometry("400x220")
dlg.configure(bg=self.BG)
dlg.grab_set()
tk.Label(dlg, text="Supprimer snapshots > 3 jours",
bg=self.BG, fg=self.ACCENT,
font=("Consolas",12,"bold")).pack(pady=10)
for lbl, attr, show in [("Utilisateur :", "vsd_user", ""),
("Mot de passe :", "vsd_pwd", "")]:
f = tk.Frame(dlg, bg=self.BG); f.pack(fill="x", padx=20, pady=4)
tk.Label(f, text=lbl, bg=self.BG, fg=self.FG,
font=("Consolas",10), width=14).pack(side="left")
var = tk.StringVar(); setattr(self, attr, var)
tk.Entry(f, textvariable=var, show=show, bg=self.BG2, fg=self.FG,
font=("Consolas",10), insertbackground=self.FG).pack(side="left", fill="x", expand=True)
days_var = tk.IntVar(value=3)
fd = tk.Frame(dlg, bg=self.BG); fd.pack(fill="x", padx=20, pady=4)
tk.Label(fd, text="Supprimer snaps < (jours) :", bg=self.BG, fg=self.FG,
font=("Consolas",10), width=24).pack(side="left")
tk.Spinbox(fd, from_=1, to=30, textvariable=days_var, width=5,
bg=self.BG2, fg=self.FG).pack(side="left")
def do_delete():
user = self.vsd_user.get().strip()
pwd = self.vsd_pwd.get().strip()
days = days_var.get()
dlg.destroy()
threading.Thread(target=self._snap_delete_worker,
args=(selected, user, pwd, days), daemon=True).start()
tk.Button(dlg, text="🗑 Supprimer", bg="#e64553", fg="white",
font=("Consolas",11,"bold"), pady=6,
command=do_delete).pack(pady=10)
def _snap_delete_worker(self, servers, vs_user, vs_pwd, max_days):
"""Supprime les snapshots PrePatch vieux de PLUS de max_days jours (garde les récents)"""
from datetime import timezone
cutoff = datetime.now(tz=timezone.utc) - __import__("datetime").timedelta(days=max_days)
connected_vcs = []
for vc in VSPHERE_HOSTS:
try:
import ssl
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
si = SmartConnect(host=vc, user=vs_user, pwd=vs_pwd,
sslContext=ctx, connectionPoolTimeout=10)
vc_content = si.RetrieveContent()
self._log_post(f"Connecte vCenter : {vc}", "ok")
connected_vcs.append((vc, si, vc_content))
except Exception as e:
self._log_post(f"Inaccessible : {vc}{str(e)[:50]}", "ko")
if not connected_vcs:
self._log_post("Aucun vCenter joignable", "ko")
return
for s in servers:
for vc, si, vc_content in connected_vcs:
vm = self._find_vm(vc_content, s["server"])
if not vm:
continue
self._log_post(f"\n VM : {vm.name} sur {vc}", "info")
def get_snapshots(snap_list):
snaps = []
for snap in snap_list:
snaps.append(snap)
snaps.extend(get_snapshots(snap.childSnapshotList))
return snaps
if not vm.snapshot:
self._log_post(f" Aucun snapshot", "info")
continue
all_snaps = get_snapshots(vm.snapshot.rootSnapshotList)
for snap in all_snaps:
snap_time = snap.createTime
if snap_time.tzinfo is None:
snap_time = snap_time.replace(tzinfo=timezone.utc)
age_days = (datetime.now(tz=timezone.utc) - snap_time).days
if age_days > max_days:
# Vieux de plus de max_days jours → supprimer
self._log_post(
f" Snapshot : {snap.name} ({age_days}j) — SUPPRESSION (>{max_days}j)", "warn")
try:
task = snap.snapshot.RemoveSnapshot_Task(removeChildren=False)
while task.info.state not in ("success","error"):
time.sleep(1)
if task.info.state == "success":
self._log_post(f" OK supprime : {snap.name}", "ok")
else:
self._log_post(f" ERREUR suppression : {snap.name}", "ko")
except Exception as e:
self._log_post(f" ERREUR : {e}", "ko")
else:
# Récent (≤ max_days jours) → conserver
self._log_post(
f" Snapshot : {snap.name} ({age_days}j) — conservé (<= {max_days}j)", "info")
break
for vc, si, _ in connected_vcs:
try: Disconnect(si)
except: pass
def _remove_old_kernels(self):
targets = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
if not targets:
messagebox.showwarning("Attention", "Aucun serveur sélectionné.")
return
if not messagebox.askyesno("Confirmation",
f"Supprimer les anciens kernels sur {len(targets)} serveur(s) ?"):
return
def worker():
for s in targets:
self._log_post(f"\n── {s['server']} — Suppression anciens kernels ──", "info")
client, _ = ssh_connect(s["server"], s["env"], self.settings,
self.pkey, self.pkey2, self.cyb_pwd)
if not client:
self._log_post(f" ❌ Connexion impossible", "ko")
continue
out, err = run_cmd(client, "sudo package-cleanup --oldkernels --count=1 -y 2>&1 | tail -10", 60)
for line in out.splitlines():
self._log_post(f" {line}", "ok" if "Removed" in line else "info")
client.close()
threading.Thread(target=worker, daemon=True).start()
def _undo_yum(self):
self.audit("UNDO_YUM", "")
target = None
for s in self.servers:
if s.get("selected") and s["accord"]=="oui":
target = s
break
if not target:
messagebox.showwarning("Attention", "Sélectionner au moins un serveur.")
return
self._log_post(f"\n── {target['server']} — Historique yum ──", "info")
def worker():
client, _ = ssh_connect(target["server"], target["env"], self.settings,
self.pkey, self.pkey2, self.cyb_pwd)
if not client:
self._log_post(" ❌ Connexion impossible", "ko")
return
out, _ = run_cmd(client, "sudo yum history list 2>/dev/null | head -20", 20)
client.close()
self._log_post(out, "info")
# Demander l'ID à annuler
def ask():
tid = simpledialog.askstring("Undo yum",
f"Historique yum sur {target['server']} :\n\n{out}\n\nID de transaction à annuler :")
if tid:
threading.Thread(target=do_undo, args=(tid,), daemon=True).start()
def do_undo(tid):
self._log_post(f" Undo transaction {tid}...", "warn")
c2, _ = ssh_connect(target["server"], target["env"], self.settings,
self.pkey, self.pkey2, self.cyb_pwd)
if not c2:
self._log_post(" ❌ Connexion impossible", "ko")
return
out2, _ = run_cmd(c2, f"sudo yum history undo {tid} -y 2>&1 | tail -15", 120)
for line in out2.splitlines():
self._log_post(f" {line}", "ok" if "Complete" in line else "info")
c2.close()
self.root.after(0, ask)
threading.Thread(target=worker, daemon=True).start()
def _reboot_servers(self):
self.audit("REBOOT_START", "")
reboot_list = [s for s in self.servers
if s.get("selected") and s.get("reboot_required") and s["accord"]=="oui"]
if not reboot_list:
reboot_list = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
if not reboot_list:
messagebox.showwarning("Attention", "Aucun serveur sélectionné.")
return
names = "\n".join(s["server"] for s in reboot_list[:10])
if not messagebox.askyesno("⚠️ Confirmation reboot",
f"Redémarrer {len(reboot_list)} serveur(s) ?\n\n{names}", icon="warning"):
return
def worker():
for s in reboot_list:
self._log_post(f"\n── {s['server']} — Reboot ──", "warn")
client, _ = ssh_connect(s["server"], s["env"], self.settings,
self.pkey, self.pkey2, self.cyb_pwd)
if not client:
self._log_post(" ❌ Connexion impossible", "ko"); continue
run_cmd(client, "sudo shutdown -r +1 'Reboot post-patching' &", 10)
client.close()
self._log_post(f" ✅ Reboot planifié dans 1 min", "ok")
threading.Thread(target=worker, daemon=True).start()
# ==========================================================================
# ONGLET 4 — RAPPORT
# ==========================================================================
def _generate_report(self):
"""Génère le rapport visuel — KPI widgets + tableau détaillé + log brut"""
if not self.results:
messagebox.showwarning("Attention", "Aucun résultat de patch disponible.")
return
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
week = date.today().isocalendar()[1]
# ── Calcul des KPI ────────────────────────────────────────────────────
total = len(self.results)
ok_c = sum(1 for r in self.results if r.get("status")=="OK")
p_c = sum(1 for r in self.results if r.get("patch_status")=="PATCHED")
uptodate = sum(1 for r in self.results if r.get("patch_status") in ("UP_TO_DATE","UP_TO_DATE"))
rb_c = sum(1 for r in self.results if r.get("reboot_required"))
ko_c = sum(1 for r in self.results if r.get("status")!="OK")
# Mettre à jour les widgets KPI
self.kpi_widgets["total" ].configure(text=str(total))
self.kpi_widgets["ok" ].configure(text=str(ok_c))
self.kpi_widgets["patched" ].configure(text=str(p_c))
self.kpi_widgets["uptodate"].configure(text=str(uptodate))
self.kpi_widgets["reboot" ].configure(text=str(rb_c))
self.kpi_widgets["ko" ].configure(text=str(ko_c))
# ── Peupler le tableau détaillé ───────────────────────────────────────
for item in self.report_tree.get_children():
self.report_tree.delete(item)
for r in self.results:
accord = "OUI" if r.get("accord")=="oui" else "NON"
snap = "OK" if r.get("snap_done") else ("PHY" if r.get("is_physical") else "")
conn = r.get("status","?")
ps = r.get("patch_status","?")
rb = "OUI" if r.get("reboot_required") else ""
detail = r.get("patch_detail","")[:60]
# Choisir le tag couleur
if r.get("status") != "OK":
tag = "ko"
elif r.get("patch_status") == "PATCHED":
tag = "patched"
elif r.get("reboot_required"):
tag = "reboot"
elif r.get("patch_status") in ("UP_TO_DATE",):
tag = "uptodate"
else:
tag = "ok"
self.report_tree.insert("", "end", values=(
r.get("server",""),
r.get("env",""),
r.get("domain",""),
accord,
snap,
conn,
ps,
rb,
detail,
), tags=(tag,))
# ── Log brut ──────────────────────────────────────────────────────────
self.report_txt.configure(state="normal")
self.report_txt.delete("1.0", "end")
def w(txt, tag="info"):
self.report_txt.insert("end", txt+"\n", tag)
w(f"{'='*70}", "title")
w(f" RAPPORT PATCHING — Semaine {week}{now}", "title")
w(f" Fichier : {self.excel_var.get()}", "info")
w(f" Intervenant : {self.settings.get('patcher','')}", "info")
w(f"{'='*70}", "title")
w("")
w(f" RESUME : {total} traites | {ok_c} OK | {p_c} patches | {rb_c} reboot | {ko_c} KO", "info")
w("")
w(f" {'SERVEUR':<35} {'STATUT':<12} {'PATCH':<16} {'REBOOT'}", "title")
w(f" {'-'*66}", "info")
for r in self.results:
status = r.get("status","?")
ps = r.get("patch_status","?")
rb = "REBOOT" if r.get("reboot_required") else ""
tag = "ok" if status=="OK" else "ko"
if r.get("patch_status") == "PATCHED": tag = "ok"
if r.get("reboot_required"): tag = "warn"
w(f" {r.get('server',''):<35} {status:<12} {ps:<16} {rb}", tag)
if r.get("patch_detail"):
w(f"{r['patch_detail'][:80]}", "info")
w("")
w(f"{'='*70}", "title")
self.report_txt.configure(state="disabled")
def _mark_patched_excel(self, servers_to_mark):
"""Met le fond vert sur les lignes patchees dans le fichier Excel.
Utilise une copie temporaire pour eviter les conflits OneDrive/verrou."""
import shutil
import tempfile
filepath = self.excel_var.get().strip()
sheet = self.sheet_var.get()
if not filepath or not os.path.exists(filepath):
messagebox.showerror("Erreur Excel", "Fichier Excel introuvable")
return
try:
from openpyxl import load_workbook
from openpyxl.styles import PatternFill
# Copie temporaire pour eviter le verrou OneDrive
tmp_dir = tempfile.mkdtemp()
tmp_path = os.path.join(tmp_dir, os.path.basename(filepath))
shutil.copy2(filepath, tmp_path)
wb = load_workbook(tmp_path)
ws = wb[sheet]
green_fill = PatternFill(start_color="00B050", end_color="00B050", fill_type="solid")
marked = 0
for s in servers_to_mark:
row_num = s.get("excel_row")
if not row_num:
for row in ws.iter_rows():
if row[0].value == s["server"]:
row_num = row[0].row
break
if row_num:
for col in range(1, ws.max_column + 1):
ws.cell(row=row_num, column=col).fill = green_fill
marked += 1
# Sauvegarder dans le temp
wb.save(tmp_path)
wb.close()
# Tenter de recopier vers l'original
try:
shutil.copy2(tmp_path, filepath)
self._log_patch(f" {marked} ligne(s) marquee(s) en vert dans Excel", "ok")
messagebox.showinfo("Excel mis a jour",
f"{marked} serveur(s) marques comme patches (fond vert) dans :\n{filepath}")
except PermissionError:
# OneDrive verrouille le fichier — sauvegarder a cote
backup_path = filepath.replace(".xlsx", f"_patched_{datetime.now():%Y%m%d_%H%M}.xlsx")
shutil.copy2(tmp_path, backup_path)
self._log_patch(f" Excel verrouille, sauvegarde dans : {backup_path}", "warn")
messagebox.showwarning("Excel verrouille",
f"Le fichier Excel est verrouille (OneDrive).\n\n"
f"Une copie a ete sauvegardee :\n{backup_path}\n\n"
f"Fermez le fichier original puis copiez la version patchee.")
# Nettoyage temp
try:
os.remove(tmp_path)
os.rmdir(tmp_dir)
except Exception:
pass
self.audit("MARK_EXCEL", f"{marked} serveurs marques, sheet={sheet}")
except Exception as e:
messagebox.showerror("Erreur Excel", f"Impossible de modifier l'Excel :\n{e}")
def _export_csv(self):
if not self.results:
messagebox.showwarning("Attention","Aucun résultat.")
return
rep_dir = os.path.join(os.path.expanduser("~"),"Documents","patch_reports")
os.makedirs(rep_dir, exist_ok=True)
csv_file = os.path.join(rep_dir, f"patch_S{date.today().isocalendar()[1]}_{datetime.now():%Y%m%d_%H%M%S}.csv")
fields = ["server","env","domain","app","accord","status","patch_status",
"patch_detail","reboot_required","snap_done","is_physical"]
with open(csv_file,"w",newline="",encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore")
w.writeheader(); w.writerows(self.results)
messagebox.showinfo("Export CSV", f"Fichier : {csv_file}")
def _export_dokuwiki(self):
if not self.results:
messagebox.showwarning("Attention","Aucun résultat.")
return
rep_dir = os.path.join(os.path.expanduser("~"),"Documents","patch_reports")
os.makedirs(rep_dir, exist_ok=True)
wiki_file = os.path.join(rep_dir, f"patch_wiki_S{date.today().isocalendar()[1]}_{datetime.now():%Y%m%d}.txt")
week = date.today().isocalendar()[1]
with open(wiki_file,"w",encoding="utf-8") as f:
f.write(f"====== Rapport Patching — Semaine {week} ======\n\n")
f.write(f"//Généré le {datetime.now():%d/%m/%Y %H:%M} — MYPCZEN / SANEF DSI-SOC//\n\n")
f.write("===== Résumé =====\n\n")
f.write(f"^ Serveurs traités ^ Patchés ^ Reboot requis ^ KO ^\n")
ok_c = sum(1 for r in self.results if r.get("status")=="OK")
p_c = sum(1 for r in self.results if r.get("patch_status")=="PATCHED")
rb_c = sum(1 for r in self.results if r.get("reboot_required"))
ko_c = sum(1 for r in self.results if r.get("status")!="OK")
f.write(f"| {len(self.results)} | {p_c} | {rb_c} | {ko_c} |\n\n")
f.write("===== Détail =====\n\n")
f.write("^ Serveur ^ Env ^ Statut ^ Patch ^ Reboot ^ Accord ^ Snapshot ^\n")
for r in self.results:
rb = "⚠ OUI" if r.get("reboot_required") else "NON"
f.write(f"| {r['server']} | {r['env']} | {r.get('status','?')} | "
f"{r.get('patch_status','?')} | {rb} | "
f"{'OUI' if r.get('accord')=='oui' else 'NON'} | "
f"{'OUI' if r.get('snap_done') else 'NON'} |\n")
messagebox.showinfo("Export DokuWiki", f"Fichier : {wiki_file}")
# ==========================================================================
# ONGLET 5 — AUDIT
# ==========================================================================
def _build_tab_audit(self):
tab = self.tab_audit
BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
hf = tk.Frame(tab, bg="#181825", pady=6)
hf.pack(fill="x")
tk.Label(hf, text="Journal d'audit", bg="#181825", fg=ACCENT,
font=("Consolas", 13, "bold")).pack(side="left", padx=12)
tk.Button(hf, text="Rafraichir", bg=BTN, fg=FG, font=("Consolas", 9),
command=self._refresh_audit).pack(side="right", padx=12)
tk.Button(hf, text="Exporter CSV", bg=BTN, fg=FG, font=("Consolas", 9),
command=self._export_audit_csv).pack(side="right", padx=4)
tk.Button(hf, text="Changer mon mot de passe", bg="#fab387", fg="#1e1e2e",
font=("Consolas", 9), command=self._change_my_password).pack(side="right", padx=4)
cols = ("timestamp", "user", "action", "details")
self.audit_tree = ttk.Treeview(tab, columns=cols, show="headings")
self.audit_tree.heading("timestamp", text="Date/Heure")
self.audit_tree.heading("user", text="Utilisateur")
self.audit_tree.heading("action", text="Action")
self.audit_tree.heading("details", text="Details")
self.audit_tree.column("timestamp", width=160)
self.audit_tree.column("user", width=120)
self.audit_tree.column("action", width=150)
self.audit_tree.column("details", width=500)
sb = ttk.Scrollbar(tab, orient="vertical", command=self.audit_tree.yview)
self.audit_tree.configure(yscrollcommand=sb.set)
self.audit_tree.pack(side="left", fill="both", expand=True, padx=6, pady=4)
sb.pack(side="right", fill="y", pady=4)
self._refresh_audit()
def _refresh_audit(self):
self.audit_tree.delete(*self.audit_tree.get_children())
for log in self.db.get_logs(500):
self.audit_tree.insert("", "end", values=(
log["timestamp"], log["username"], log["action"], log.get("details", "")))
def _export_audit_csv(self):
logs = self.db.get_logs(5000)
if not logs:
messagebox.showinfo("Info", "Aucun log"); return
p = filedialog.asksaveasfilename(defaultextension=".csv",
filetypes=[("CSV", "*.csv")],
initialfile=f"audit_patch_{datetime.now():%Y%m%d_%H%M%S}.csv")
if not p: return
with open(p, "w", newline="", encoding="utf-8-sig") as f:
w = csv.writer(f, delimiter=";")
w.writerow(["Timestamp", "Utilisateur", "Action", "Details"])
for l in logs:
w.writerow([l["timestamp"], l["username"], l["action"], l.get("details", "")])
self.audit("EXPORT_AUDIT", f"{len(logs)} entries -> {p}")
def _change_my_password(self):
dlg = ChangePasswordDialog(self.root, self.db, self.current_user["username"])
# Centrer sur le parent
if dlg.success:
messagebox.showinfo("OK", "Mot de passe modifie avec succes")
# ==========================================================================
# ONGLET 6 — UTILISATEURS (admin only)
# ==========================================================================
def _build_tab_users(self):
tab = self.tab_users
BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
# Creation user
cf = tk.Frame(tab, bg="#181825", pady=6)
cf.pack(fill="x")
tk.Label(cf, text="Gestion des utilisateurs", bg="#181825", fg=ACCENT,
font=("Consolas", 13, "bold")).pack(side="left", padx=12)
af = tk.Frame(tab, bg=BG, pady=6)
af.pack(fill="x", padx=12)
tk.Label(af, text="Utilisateur :", bg=BG, fg=FG, font=("Consolas", 10)).pack(side="left", padx=(0, 4))
self.new_user_var = tk.StringVar()
tk.Entry(af, textvariable=self.new_user_var, bg=BG2, fg=FG,
font=("Consolas", 10), width=15, insertbackground=FG).pack(side="left", padx=4)
tk.Label(af, text="MDP :", bg=BG, fg=FG, font=("Consolas", 10)).pack(side="left", padx=(8, 4))
self.new_pwd_var = tk.StringVar()
tk.Entry(af, textvariable=self.new_pwd_var, bg=BG2, fg=FG,
font=("Consolas", 10), width=15, show="*", insertbackground=FG).pack(side="left", padx=4)
tk.Label(af, text="Role :", bg=BG, fg=FG, font=("Consolas", 10)).pack(side="left", padx=(8, 4))
self.new_role_var = tk.StringVar(value="operator")
tk.OptionMenu(af, self.new_role_var, "admin", "operator", "viewer").pack(side="left", padx=4)
tk.Button(af, text="Creer", bg="#40a02b", fg="white", font=("Consolas", 9, "bold"),
command=self._create_user).pack(side="left", padx=8)
# Liste users
cols = ("username", "role", "locked", "must_change", "last_login")
self.users_tree = ttk.Treeview(tab, columns=cols, show="headings")
self.users_tree.heading("username", text="Utilisateur")
self.users_tree.heading("role", text="Role")
self.users_tree.heading("locked", text="Verrouille")
self.users_tree.heading("must_change", text="Chg. MDP")
self.users_tree.heading("last_login", text="Derniere connexion")
self.users_tree.column("username", width=150)
self.users_tree.column("role", width=100)
self.users_tree.column("locked", width=100)
self.users_tree.column("must_change", width=100)
self.users_tree.column("last_login", width=180)
self.users_tree.pack(fill="both", expand=True, padx=12, pady=6)
bf = tk.Frame(tab, bg=BG, pady=6)
bf.pack(fill="x", padx=12)
tk.Button(bf, text="Supprimer", bg="#f38ba8", fg="#1e1e2e",
font=("Consolas", 9, "bold"), command=self._delete_user).pack(side="left", padx=4)
tk.Button(bf, text="Deverrouiller", bg="#fab387", fg="#1e1e2e",
font=("Consolas", 9, "bold"), command=self._unlock_user).pack(side="left", padx=4)
tk.Button(bf, text="Reset MDP", bg=BTN, fg=FG,
font=("Consolas", 9), command=self._reset_user_pwd).pack(side="left", padx=4)
tk.Button(bf, text="Rafraichir", bg=BTN, fg=FG,
font=("Consolas", 9), command=self._refresh_users).pack(side="right", padx=4)
self._refresh_users()
def _refresh_users(self):
self.users_tree.delete(*self.users_tree.get_children())
for u in self.db.list_users():
self.users_tree.insert("", "end", values=(
u["username"], u["role"],
"OUI" if u["locked"] else "",
"OUI" if u["must_change_pwd"] else "",
u["last_login"] or ""))
def _create_user(self):
un = self.new_user_var.get().strip()
pw = self.new_pwd_var.get().strip()
role = self.new_role_var.get()
if not un or not pw:
messagebox.showwarning("Erreur", "Saisir un nom et un mot de passe"); return
if self.db.create_user(un, pw, role):
self.audit("CREATE_USER", f"{un} (role={role})")
self.new_user_var.set(""); self.new_pwd_var.set("")
self._refresh_users()
else:
messagebox.showerror("Erreur", f"L'utilisateur '{un}' existe deja")
def _delete_user(self):
sel = self.users_tree.selection()
if not sel: return
un = self.users_tree.item(sel[0])["values"][0]
if un == "admin":
messagebox.showwarning("Erreur", "Impossible de supprimer admin"); return
if messagebox.askyesno("Confirmer", f"Supprimer '{un}' ?"):
self.db.delete_user(un)
self.audit("DELETE_USER", un)
self._refresh_users()
def _unlock_user(self):
sel = self.users_tree.selection()
if not sel: return
un = self.users_tree.item(sel[0])["values"][0]
self.db.unlock_user(un)
self.audit("UNLOCK_USER", un)
self._refresh_users()
def _reset_user_pwd(self):
sel = self.users_tree.selection()
if not sel: return
un = self.users_tree.item(sel[0])["values"][0]
new_pwd = simpledialog.askstring("Reset", f"Nouveau mot de passe pour {un} :", show="*")
if new_pwd:
self.db.reset_password(un, new_pwd)
self.audit("RESET_PASSWORD", un)
self._refresh_users()
# ==============================================================================
# MAIN
# ==============================================================================
if __name__ == "__main__":
db = Database()
root = tk.Tk()
root.withdraw() # Cacher pendant le login
login = LoginDialog(root, db)
if not login.result:
root.destroy()
sys.exit(0)
user = login.result
# Forcer changement de mot de passe au premier login
if user.get("must_change_pwd"):
dlg = ChangePasswordDialog(root, db, user["username"], forced=True)
if not dlg.success:
root.destroy()
sys.exit(0)
root.deiconify()
app = PatchManagerV2(root, db, user)
root.mainloop()