sanef_patch_manager_v2/patch_manager_v2.py

4232 lines
193 KiB
Python

#!/usr/bin/env python3
# ==============================================================================
# patch_manager_v2.py — SANEF Linux Patch Manager GUI v2
# Gestion complète du patching Linux via Excel + SSH + vSphere
#
# Prérequis : py -m pip install paramiko openpyxl requests pyVmomi --proxy http://proxy.sanef.fr:8080
# Usage : py patch_manager_v2.py
# Auteur : MYPCZEN / SANEF DSI - SOC
# ==============================================================================
import tkinter as tk
from tkinter import ttk, filedialog, scrolledtext, messagebox, simpledialog
import threading
import paramiko
import socket
import time
import csv
import os
import sys
import json
from datetime import datetime, date
from copy import deepcopy
import sqlite3
import hashlib
import secrets
import ctypes
# openpyxl optionnel — installé si absent
try:
import openpyxl
except ImportError:
os.system("py -m pip install openpyxl --proxy http://proxy.sanef.fr:8080 -q")
import openpyxl
# pyVmomi optionnel — pour vSphere
try:
from pyVim.connect import SmartConnect, Disconnect
from pyVmomi import vim
VSPHERE_OK = True
except ImportError:
VSPHERE_OK = False
# ==============================================================================
# CONSTANTES
# ==============================================================================
VERSION = "2.0"
VSPHERE_HOSTS = [
"vpgesavcs1.sanef.groupe",
"vpmetavcs1.sanef.groupe",
"vpsicavcs1.sanef.groupe",
]
# Ordre vCenter selon le prefixe du hostname
VCENTER_METIER = "vpmetavcs1.sanef.groupe"
VCENTER_GESTION = "vpgesavcs1.sanef.groupe"
VCENTER_SICA = "vpsicavcs1.sanef.groupe"
PREFIXES_METIER = ("vp", "vi", "sp")
PREFIXES_SICA = ("si",)
def get_vcenter_order_for_server(server_name):
"""Retourne la liste des vCenters ordonnee selon le prefixe du hostname.
vp*, vi*, sp* → metier first ; si* → sica first ; autres → gestion first"""
prefix = server_name[:2].lower() if len(server_name) >= 2 else ""
if prefix in PREFIXES_SICA:
return [VCENTER_SICA, VCENTER_METIER, VCENTER_GESTION]
elif prefix in PREFIXES_METIER:
return [VCENTER_METIER, VCENTER_GESTION, VCENTER_SICA]
else:
return [VCENTER_GESTION, VCENTER_METIER, VCENTER_SICA]
YUM_EXCLUDES_STD = (
"--exclude=*mongodb* --exclude=*mysql* --exclude=*postgres* "
"--exclude=*mariadb* --exclude=*oracle* --exclude=*pgdg* --exclude=*php* "
"--exclude=*java* --exclude=*redis* --exclude=*elasticsearch* --exclude=*nginx* "
"--exclude=*mod_ssl* --exclude=*haproxy* --exclude=*certbot* "
"--exclude=*python-certbot* --exclude=*docker* --exclude=*podman* "
"--exclude=*centreon* --exclude=*qwserver* "
"--exclude=*ansible* --exclude=*node* --exclude=*tina* --exclude=*memcached* "
"--exclude=*nextcloud* --exclude=*pgbouncer* --exclude=*pgpool* "
"--exclude=*pgbadger* --exclude=*psycopg2* --exclude=*barman* --exclude=*kibana* "
"--exclude=*sdcss*" # Symantec DCS agent + sdcss-kmod kernel module
)
YUM_EXCLUDES_FLUX_LIBRE = "--exclude=*podman*"
PRE_PATCH_SCRIPT = r"""
cat > /tmp/secops_pre_patching.sh << 'EOF'
HOSTNAME=$(hostname)
SNAPSHOT_DIR="/tmp"
systemctl list-units --type=service --state=running --no-pager \
| awk '{print $1}' | grep '\.service$' \
> ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt
echo "Services sauvegardes : $(wc -l < ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt)"
ss -tlnup > ${SNAPSHOT_DIR}/secops_ports_avant_${HOSTNAME}.txt
ss -tlnup | awk 'NR>1 && $7 != "" {
match($7, /users:\(\("([^"]+)"/, arr)
split($5, addr, ":")
port = addr[length(addr)]
if (arr[1] != "" && port+0 < 32768) print port, arr[1]
}' | sort -u > ${SNAPSHOT_DIR}/secops_ports_detail_avant_${HOSTNAME}.txt
echo "Ports sauvegardes : $(grep -c LISTEN ${SNAPSHOT_DIR}/secops_ports_avant_${HOSTNAME}.txt)"
echo "PRE_PATCH_OK"
EOF
bash /tmp/secops_pre_patching.sh
"""
POST_PATCH_SCRIPT = r"""
cat > /tmp/secops_post_patching.sh << 'EOF'
HOSTNAME=$(hostname)
SNAPSHOT_DIR="/tmp"
GREEN='\033[0;32m'; RED='\033[0;31m'; YELLOW='\033[0;33m'; NC='\033[0m'
RAPPORT="/tmp/rapport_patching_${HOSTNAME}_$(date +%Y%m%d_%H%M).txt"
systemctl list-units --type=service --state=running --no-pager \
| awk '{print $1}' | grep '\.service$' \
> ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt
DISPARUS_SVC=$(comm -23 \
<(sort ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt) \
<(sort ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt) \
| grep -v "user@")
APPARUS_SVC=$(comm -13 \
<(sort ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt) \
<(sort ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt) \
| grep -v "setroubleshootd\|user@")
echo "--- SERVICES ---" | tee -a ${RAPPORT}
echo "Avant: $(wc -l < ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt) | Apres: $(wc -l < ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt)" | tee -a ${RAPPORT}
[ -z "$DISPARUS_SVC" ] && echo "OK - Aucun service disparu" | tee -a ${RAPPORT} || { echo "ALERTE services disparus:" | tee -a ${RAPPORT}; echo "$DISPARUS_SVC" | tee -a ${RAPPORT}; }
[ -z "$APPARUS_SVC" ] && echo "OK - Aucun nouveau service" | tee -a ${RAPPORT} || { echo "WARN nouveaux services:" | tee -a ${RAPPORT}; echo "$APPARUS_SVC" | tee -a ${RAPPORT}; }
ss -tlnup > ${SNAPSHOT_DIR}/secops_ports_apres_${HOSTNAME}.txt
ss -tlnup | awk 'NR>1 && $7 != "" {
match($7, /users:\(\("([^"]+)"/, arr)
split($5, addr, ":")
port = addr[length(addr)]
if (arr[1] != "" && port+0 < 32768) print port, arr[1]
}' | sort -u > ${SNAPSHOT_DIR}/secops_ports_detail_apres_${HOSTNAME}.txt
PORTS_DISPARUS=$(comm -23 \
<(sort ${SNAPSHOT_DIR}/secops_ports_detail_avant_${HOSTNAME}.txt) \
<(sort ${SNAPSHOT_DIR}/secops_ports_detail_apres_${HOSTNAME}.txt))
PORTS_APPARUS=$(comm -13 \
<(sort ${SNAPSHOT_DIR}/secops_ports_detail_avant_${HOSTNAME}.txt) \
<(sort ${SNAPSHOT_DIR}/secops_ports_detail_apres_${HOSTNAME}.txt))
echo "--- PORTS ---" | tee -a ${RAPPORT}
echo "Avant: $(grep -c LISTEN ${SNAPSHOT_DIR}/secops_ports_avant_${HOSTNAME}.txt) | Apres: $(grep -c LISTEN ${SNAPSHOT_DIR}/secops_ports_apres_${HOSTNAME}.txt)" | tee -a ${RAPPORT}
[ -z "$PORTS_DISPARUS" ] && echo "OK - Aucun port disparu" | tee -a ${RAPPORT} || { echo "ALERTE ports disparus:" | tee -a ${RAPPORT}; echo "$PORTS_DISPARUS" | tee -a ${RAPPORT}; }
[ -z "$PORTS_APPARUS" ] && echo "OK - Aucun nouveau port" | tee -a ${RAPPORT} || { echo "WARN nouveaux ports:" | tee -a ${RAPPORT}; echo "$PORTS_APPARUS" | tee -a ${RAPPORT}; }
echo "" | tee -a ${RAPPORT}
echo "RAPPORT: ${RAPPORT}"
echo "POST_PATCH_OK"
EOF
bash /tmp/secops_post_patching.sh
"""
FL_PRE_PATCH_SCRIPT = r"""
cat > /tmp/secops_fl_pre_patch.sh << 'FLEOF'
# Detecter l'utilisateur applicatif podman
APP_USER=$(ps aux | grep -E "conmon|podman" | grep -v grep | awk '{print $1}' | sort -u | head -1)
if [ -z "$APP_USER" ]; then
echo "FL_NO_PODS"
exit 0
fi
echo "FL_USER=$APP_USER"
# Snapshot pods running via sudo su
sudo su - $APP_USER -c 'XDG_RUNTIME_DIR=/run/user/$(id -u) podman ps --format "{{.Names}}" --filter "status=running"' \
> /tmp/fl_pods_avant_$(hostname)_$(date +%Y%m%d_%H%M).txt
echo "FL_PODS_SAVED=$(wc -l < /tmp/fl_pods_avant_$(hostname)_*.txt | tail -1)"
# Status boo_manage si disponible
if command -v boo_manage &>/dev/null || sudo su - $APP_USER -c "which boo_manage" &>/dev/null; then
sudo su - $APP_USER -c "boo_manage -a -c status" 2>/dev/null || true
fi
echo "FL_PRE_OK"
FLEOF
bash /tmp/secops_fl_pre_patch.sh
"""
FL_POST_PATCH_SCRIPT = r"""
cat > /tmp/secops_fl_post_patch.sh << 'FLEOF'
APP_USER=$(ps aux | grep -E "conmon|podman" | grep -v grep | awk '{print $1}' | sort -u | head -1)
if [ -z "$APP_USER" ]; then
echo "FL_NO_PODS"
exit 0
fi
echo "FL_USER=$APP_USER"
SNAPSHOT=$(ls -t /tmp/fl_pods_avant_$(hostname)_*.txt 2>/dev/null | head -1)
if [ -z "$SNAPSHOT" ]; then
echo "FL_NO_SNAPSHOT"
sudo su - $APP_USER -c "boo_manage -a -c start" 2>/dev/null || true
exit 0
fi
AVANT=$(cat $SNAPSHOT)
APRES=$(sudo su - $APP_USER -c 'XDG_RUNTIME_DIR=/run/user/$(id -u) podman ps --format "{{.Names}}"')
ARRETES=0
while IFS= read -r pod; do
if ! echo "$APRES" | grep -q "^${pod}$"; then
echo "FL_RESTART=$pod"
sudo su - $APP_USER -c "boo_manage -p ${pod} -c start" 2>/dev/null || \
sudo su - $APP_USER -c "XDG_RUNTIME_DIR=/run/user/\$(id -u) podman start ${pod}" 2>/dev/null
ARRETES=$((ARRETES + 1))
fi
done <<< "$AVANT"
if [ $ARRETES -eq 0 ]; then
echo "FL_ALL_OK"
else
echo "FL_RESTARTED=$ARRETES"
fi
sudo su - $APP_USER -c "boo_manage -a -c status" 2>/dev/null || true
echo "FL_POST_OK"
FLEOF
bash /tmp/secops_fl_post_patch.sh
"""
PRESET_PACKAGES = {
"Patch global (avec excludes)": "",
"Java": "java-1.8.0-openjdk java-1.8.0-openjdk-headless",
"OpenSSL": "openssl",
"OpenSSH": "openssh openssh-server openssh-clients",
"gnutls + libsoup + rhc": "gnutls libsoup rhc",
"glibc": "glibc",
"mariadb-libs": "mariadb-libs",
"curl": "curl libcurl",
"Personnalise...": "CUSTOM",
}
SETTINGS_FILE = os.path.join(os.path.expanduser("~"), ".patch_manager_settings.json")
DB_PATH = os.path.join(os.path.expanduser("~"), ".patch_manager.db")
# ==============================================================================
# DATABASE — Auth + Audit
# ==============================================================================
_db_lock = threading.Lock()
def _hash_password(password, salt=None):
if salt is None:
salt = secrets.token_hex(16)
h = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 310_000)
return f"{salt}${h.hex()}"
def _verify_password(password, stored):
if "$" not in stored:
return False
salt, _ = stored.split("$", 1)
return _hash_password(password, salt) == stored
class Database:
def __init__(self):
self.conn = sqlite3.connect(DB_PATH, check_same_thread=False)
self.conn.row_factory = sqlite3.Row
self._init_tables()
def _exec(self, sql, params=()):
with _db_lock:
return self.conn.execute(sql, params)
def _commit(self):
with _db_lock:
self.conn.commit()
def _init_tables(self):
with _db_lock:
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'operator',
must_change_pwd INTEGER NOT NULL DEFAULT 1,
failed_attempts INTEGER NOT NULL DEFAULT 0,
locked INTEGER NOT NULL DEFAULT 0,
created_at TEXT DEFAULT (datetime('now','localtime')),
last_login TEXT
);
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT DEFAULT (datetime('now','localtime')),
username TEXT,
action TEXT NOT NULL,
details TEXT
);
""")
# Compte admin par defaut — mot de passe aléatoire unique
row = self.conn.execute("SELECT id FROM users WHERE username='admin'").fetchone()
if not row:
init_pwd = secrets.token_urlsafe(16)
h = _hash_password(init_pwd)
self.conn.execute(
"INSERT INTO users (username, password_hash, role, must_change_pwd) VALUES (?,?,?,?)",
("admin", h, "admin", 1))
self._init_password = init_pwd # Affiché une seule fois au login
self.conn.commit()
def authenticate(self, username, password):
row = self._exec("SELECT * FROM users WHERE LOWER(username)=LOWER(?)", (username,)).fetchone()
if not row:
return None, "Utilisateur inconnu"
if row["locked"]:
return None, "Compte verrouille (3 tentatives). Contactez l'admin."
if not _verify_password(password, row["password_hash"]):
attempts = row["failed_attempts"] + 1
if attempts >= 3:
self._exec("UPDATE users SET failed_attempts=?, locked=1 WHERE id=?", (attempts, row["id"]))
else:
self._exec("UPDATE users SET failed_attempts=? WHERE id=?", (attempts, row["id"]))
self._commit()
remaining = 3 - attempts
if remaining <= 0:
return None, "Compte verrouille apres 3 echecs."
return None, f"Mot de passe incorrect ({remaining} essai(s) restant(s))"
# Succes
self._exec("UPDATE users SET failed_attempts=0, last_login=datetime('now','localtime') WHERE id=?",
(row["id"],))
self._commit()
return dict(row), None
def change_password(self, username, new_password):
h = _hash_password(new_password)
self._exec("UPDATE users SET password_hash=?, must_change_pwd=0 WHERE username=?", (h, username))
self._commit()
def create_user(self, username, password, role="operator"):
try:
h = _hash_password(password)
self._exec("INSERT INTO users (username, password_hash, role, must_change_pwd) VALUES (?,?,?,1)",
(username, h, role))
self._commit()
return True
except sqlite3.IntegrityError:
return False
def delete_user(self, username):
self._exec("DELETE FROM users WHERE username=? AND username != 'admin'", (username,))
self._commit()
def unlock_user(self, username):
self._exec("UPDATE users SET locked=0, failed_attempts=0 WHERE username=?", (username,))
self._commit()
def reset_password(self, username, new_password):
h = _hash_password(new_password)
self._exec("UPDATE users SET password_hash=?, must_change_pwd=1, locked=0, failed_attempts=0 WHERE username=?",
(h, username))
self._commit()
def list_users(self):
return [dict(r) for r in self._exec("SELECT * FROM users ORDER BY username").fetchall()]
def log_action(self, username, action, details=""):
self._exec("INSERT INTO audit_log (username, action, details) VALUES (?,?,?)",
(username, action, details))
self._commit()
def get_logs(self, limit=500):
return [dict(r) for r in self._exec(
"SELECT * FROM audit_log ORDER BY id DESC LIMIT ?", (limit,)).fetchall()]
# ==============================================================================
# SPLUNK HEC — envoi de logs vers Splunk
# ==============================================================================
def send_to_splunk(settings, event_data):
"""Envoie un evenement vers Splunk via HEC (HTTP Event Collector).
settings doit contenir: splunk_url, splunk_token, splunk_index, splunk_enabled
event_data: dict avec les champs a envoyer"""
if not settings.get("splunk_enabled") or settings.get("splunk_enabled") == "false":
return
url = settings.get("splunk_url", "").strip()
token = settings.get("_splunk_token_session", settings.get("splunk_token", "")).strip()
if not url or not token:
return
try:
import urllib.request
import ssl
payload = json.dumps({
"index": settings.get("splunk_index", "main"),
"sourcetype": "patch_manager",
"source": "sanef_patch_manager_v2",
"host": socket.gethostname(),
"event": event_data,
}).encode("utf-8")
req = urllib.request.Request(
url.rstrip("/") + "/services/collector/event",
data=payload,
headers={
"Authorization": f"Splunk {token}",
"Content-Type": "application/json",
},
method="POST")
# Proxy SANEF si configure
proxy_url = settings.get("proxy_url", "")
if proxy_url:
handler = urllib.request.ProxyHandler({"https": proxy_url, "http": proxy_url})
opener = urllib.request.build_opener(handler)
else:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_peer = False
handler = urllib.request.HTTPSHandler(context=ctx)
opener = urllib.request.build_opener(handler)
opener.open(req, timeout=5)
except Exception:
pass # Silencieux — ne jamais bloquer le patching pour un log
# ==============================================================================
# WINDOW CENTERING — toujours sur le meme ecran
# ==============================================================================
def center_window(win, width=None, height=None, parent=None):
"""Centre une fenetre sur le meme ecran que le parent (ou le curseur si pas de parent)"""
win.update_idletasks()
if width is None:
width = win.winfo_width()
if height is None:
height = win.winfo_height()
try:
user32 = ctypes.windll.user32
class POINT(ctypes.Structure):
_fields_ = [("x", ctypes.c_long), ("y", ctypes.c_long)]
class RECT(ctypes.Structure):
_fields_ = [("left", ctypes.c_long), ("top", ctypes.c_long),
("right", ctypes.c_long), ("bottom", ctypes.c_long)]
class MONITORINFO(ctypes.Structure):
_fields_ = [("cbSize", ctypes.c_ulong), ("rcMonitor", RECT),
("rcWork", RECT), ("dwFlags", ctypes.c_ulong)]
MONITOR_DEFAULTTONEAREST = 2
if parent is not None:
# Centrer sur l'ecran du parent
px = parent.winfo_x() + parent.winfo_width() // 2
py = parent.winfo_y() + parent.winfo_height() // 2
pt = POINT(px, py)
else:
# Centrer sur l'ecran du curseur
pt = POINT()
user32.GetCursorPos(ctypes.byref(pt))
hmon = user32.MonitorFromPoint(pt, MONITOR_DEFAULTTONEAREST)
mi = MONITORINFO()
mi.cbSize = ctypes.sizeof(MONITORINFO)
user32.GetMonitorInfoW(hmon, ctypes.byref(mi))
work = mi.rcWork
sx = work.left + (work.right - work.left - width) // 2
sy = work.top + (work.bottom - work.top - height) // 2
except Exception:
if parent is not None:
sx = parent.winfo_x() + (parent.winfo_width() - width) // 2
sy = parent.winfo_y() + (parent.winfo_height() - height) // 2
else:
sx = (win.winfo_screenwidth() - width) // 2
sy = (win.winfo_screenheight() - height) // 2
win.geometry(f"{width}x{height}+{sx}+{sy}")
# ==============================================================================
# SETTINGS
# ==============================================================================
DEFAULT_SETTINGS = {
"keyfile": r"C:\scripts\id_rsa_cybglobal.pem",
"keyfile2": r"C:\scripts\id_rsa_cybsecope.ppk",
"cybr_user": "CYBP01336",
"target_user": "cybsecope",
"psmp": "psmp.sanef.fr",
"ssh_port": 22,
"timeout": 20,
"parallelism": 3,
"excel_file": "",
"patcher": "",
"vs_user": "",
"splunk_enabled": "false",
"splunk_url": "",
"splunk_index": "main",
"proxy_url": "http://proxy.sanef.fr:8080",
"sharepoint_notif_path": "",
"network_log_path": "",
}
def load_settings():
if os.path.exists(SETTINGS_FILE):
try:
with open(SETTINGS_FILE, "r") as f:
s = json.load(f)
DEFAULT_SETTINGS.update(s)
except Exception:
pass
return dict(DEFAULT_SETTINGS)
def save_settings(settings):
try:
with open(SETTINGS_FILE, "w") as f:
json.dump(settings, f, indent=2)
except Exception:
pass
# ==============================================================================
# EXCEL READER
# ==============================================================================
def get_week_sheet(wb):
"""Trouve la feuille dont le nom correspond à la semaine courante"""
week_num = date.today().isocalendar()[1]
candidates = [f"S{week_num}", f"S{week_num:02d}",
f"Semaine {week_num}", f"W{week_num}", str(week_num)]
for name in candidates:
if name in wb.sheetnames:
return name
# Retourner toutes les feuilles pour que l'utilisateur choisisse
return None
# Couleur vert Excel = patché OK
EXCEL_GREEN = "FF00B050"
def read_excel_servers(filepath, sheet_name=None):
"""Lit le fichier Excel et retourne la liste des serveurs Linux + couleurs.
Utilise une copie temporaire pour eviter les conflits OneDrive."""
import shutil, tempfile, io
tmp_dir = tempfile.mkdtemp()
tmp_path = os.path.join(tmp_dir, os.path.basename(filepath))
wb = None
# Methode 1: copie fichier
try:
shutil.copy2(filepath, tmp_path)
wb = openpyxl.load_workbook(tmp_path, data_only=True)
except Exception:
pass
# Methode 2: lecture binaire en memoire (bypass lock OneDrive)
if wb is None:
try:
with open(filepath, "rb") as f:
data = io.BytesIO(f.read())
wb = openpyxl.load_workbook(data, data_only=True)
except Exception:
pass
# Methode 3: ouverture directe
if wb is None:
wb = openpyxl.load_workbook(filepath, data_only=True)
if sheet_name is None:
sheet_name = get_week_sheet(wb)
if sheet_name is None:
return None, wb.sheetnames # retourner la liste pour que l'user choisisse
ws = wb[sheet_name]
# Lire les en-têtes (première ligne non vide)
headers = {}
header_row = None
for row_idx, row in enumerate(ws.iter_rows(max_row=5), start=1):
vals = [c.value for c in row if c.value is not None]
if len(vals) >= 3:
headers = {str(c.value).strip().lower(): c.column - 1
for c in row if c.value is not None}
header_row = row_idx
break
if not headers:
return [], wb.sheetnames
# Mapping champs (tolérance casse/accents)
def find_col(keys):
for k in keys:
for h, idx in headers.items():
if k.lower() in h.lower():
return idx
return None
col_server = find_col(["asset name", "nom du serveur", "serveur", "hostname", "server", "nom"])
col_os = find_col(["os", "système", "systeme"])
col_env = find_col(["environnement", "environment", "env"])
col_domain = find_col(["domaine", "domain"])
col_app = find_col(["nom complet", "application", "nom_complet"])
col_accord = find_col(["accord"])
col_date = find_col(["date du patch", "date patch", "date heure", "date_heure", "date patching", "date"])
col_patcher = find_col(["intervenant", "patcheur", "technicien"])
servers = []
for row in ws.iter_rows(min_row=header_row + 1):
vals = [c.value for c in row]
cells = list(row)
if not vals or all(v is None for v in vals):
continue
def get(col):
if col is None or col >= len(vals):
return ""
v = vals[col]
return str(v).strip() if v is not None else ""
os_val = get(col_os)
if "linux" not in os_val.lower():
continue
server_name = get(col_server)
if not server_name:
continue
# Parsing date
date_val = None
if col_date is not None and col_date < len(vals):
raw_date = vals[col_date]
if isinstance(raw_date, datetime):
date_val = raw_date.date()
elif isinstance(raw_date, date):
date_val = raw_date
elif raw_date:
try:
date_val = datetime.strptime(str(raw_date).strip()[:10], "%Y-%m-%d").date()
except Exception:
try:
date_val = datetime.strptime(str(raw_date).strip()[:10], "%d/%m/%Y").date()
except Exception:
date_val = None
# Accord — règle stricte :
# colonne absente OU cellule vide = NON (on ne patche pas sans accord explicite)
# Seul "oui" / True dans la cellule = accord confirmé
if col_accord is None or col_accord >= len(vals):
raw_accord = "non"
else:
raw_v = vals[col_accord]
if raw_v is None or str(raw_v).strip() == "":
raw_accord = "non" # vide = pas d'accord
elif isinstance(raw_v, bool):
raw_accord = "oui" if raw_v else "non"
else:
v_str = str(raw_v).strip().lower()
raw_accord = "oui" if v_str == "oui" else "non"
# Détecter si la cellule serveur a un fond vert = déjà patché
already_patched = False
if col_server is not None and col_server < len(cells):
try:
fill = cells[col_server].fill
bg = fill.fgColor.rgb if fill and fill.fgColor else ""
if bg == EXCEL_GREEN or bg.upper() == EXCEL_GREEN:
already_patched = True
except Exception:
pass
servers.append({
"server": server_name,
"os": os_val,
"env": get(col_env),
"domain": get(col_domain),
"app": get(col_app),
"accord": raw_accord if raw_accord else "non",
"date_patch": date_val,
"patcher": (get(col_patcher) or "").strip(),
"selected": False,
"snap_done": False,
"is_physical": False,
"status": "✅ DÉJÀ PATCHÉ" if already_patched else "",
"patch_status": "PATCHED" if already_patched else "",
"patch_detail": "Fond vert Excel" if already_patched else "",
"reboot_required": False,
"already_patched": already_patched,
"excel_row": cells[0].row if cells else None,
})
wb.close()
# Nettoyage copie temporaire
try:
os.remove(tmp_path)
os.rmdir(tmp_dir)
except Exception:
pass
return servers, None
# ==============================================================================
# SSH HELPERS
# ==============================================================================
def build_fqdn(server, env):
if "." in server:
return [server]
env_low = env.lower()
if "recette" in env_low or "rec" in env_low:
return [server, f"{server}.sanef-rec.fr"]
else:
return [server, f"{server}.sanef.groupe"]
def load_key(keyfile):
if not keyfile or not os.path.exists(keyfile.strip('"').strip("'")):
return None
keyfile = keyfile.strip('"').strip("'")
for cls in [paramiko.RSAKey, paramiko.Ed25519Key, paramiko.ECDSAKey]:
try:
return cls.from_private_key_file(keyfile)
except Exception:
continue
return None
def ssh_connect(server, env, settings, pkey, pkey2, cyb_password=None):
"""Connexion SSH avec fallback FQDN et clé. Retourne (client, hostname_effectif)"""
candidates = build_fqdn(server, env)
is_prod = "prod" in env.lower() or "production" in env.lower()
ssh_port = int(settings.get("ssh_port", 22))
for hostname in candidates:
if is_prod and cyb_password:
# CyberArk keyboard-interactive (PSMP toujours port 22)
username = f"{settings['cybr_user']}@{settings['target_user']}@{hostname}"
password = cyb_password
try:
def handler(title, instructions, prompt_list):
return [password] * len(prompt_list)
transport = paramiko.Transport((settings["psmp"], 22))
transport.connect()
transport.auth_interactive(username, handler)
client = paramiko.SSHClient()
client._transport = transport
client._eff_host = hostname
return client, hostname
except Exception:
continue
else:
# SSH direct avec clé
for key in [k for k in [pkey, pkey2] if k]:
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname=hostname, port=ssh_port,
username=settings["target_user"],
pkey=key, timeout=settings["timeout"],
look_for_keys=False, allow_agent=False)
client._eff_host = hostname
return client, hostname
except Exception:
continue
return None, None
def run_cmd(client, cmd, timeout=300):
try:
_, stdout, stderr = client.exec_command(cmd, timeout=timeout)
# Timeout sur le read pour eviter les blocages PSMP
stdout.channel.settimeout(timeout)
stderr.channel.settimeout(timeout)
out = stdout.read().decode("utf-8", errors="ignore").strip()
err = stderr.read().decode("utf-8", errors="ignore").strip()
return out, err
except socket.timeout:
return "", "TIMEOUT"
except Exception as e:
return "", str(e)
def run_cmd_stream(client, cmd, timeout=300, on_line=None):
"""Execute une commande SSH et streame la sortie ligne par ligne.
on_line(line) est appele pour chaque ligne recue en temps reel.
Retourne (full_output, stderr) comme run_cmd."""
try:
_, stdout, stderr = client.exec_command(cmd, timeout=timeout)
stdout.channel.settimeout(timeout)
stderr.channel.settimeout(timeout)
lines = []
buf = ""
while not stdout.channel.exit_status_ready() or stdout.channel.recv_ready():
if stdout.channel.recv_ready():
chunk = stdout.channel.recv(4096).decode("utf-8", errors="ignore")
buf += chunk
while "\n" in buf:
line, buf = buf.split("\n", 1)
line = line.rstrip("\r")
lines.append(line)
if on_line and line.strip():
on_line(line)
else:
time.sleep(0.1)
# Lire le reste
remaining = stdout.read().decode("utf-8", errors="ignore")
buf += remaining
for line in buf.splitlines():
line = line.rstrip("\r")
lines.append(line)
if on_line and line.strip():
on_line(line)
err = stderr.read().decode("utf-8", errors="ignore").strip()
return "\n".join(lines).strip(), err
except socket.timeout:
return "\n".join(lines).strip() if 'lines' in dir() else "", "TIMEOUT"
except Exception as e:
return "\n".join(lines).strip() if 'lines' in dir() else "", str(e)
def detect_physical(client):
"""Détecte si le serveur est physique ou VM"""
out, _ = run_cmd(client, "sudo virt-what 2>/dev/null || echo UNKNOWN", 10)
if not out or out == "UNKNOWN":
out2, _ = run_cmd(client, "sudo dmidecode -s system-manufacturer 2>/dev/null || echo UNKNOWN", 10)
vmware_kw = ["vmware", "virtualbox", "kvm", "xen", "qemu", "microsoft corporation"]
if any(k in out2.lower() for k in vmware_kw):
return False # VM
phys_kw = ["hp", "dell", "lenovo", "ibm", "supermicro", "cisco"]
if any(k in out2.lower() for k in phys_kw):
return True # Physique
if out and out != "UNKNOWN":
return False # virt-what a trouvé quelque chose = VM
return False # par défaut supposer VM (plus sûr)
def _is_fl_podman(server_name):
"""Retourne True si le serveur Flux Libre utilise Podman (pas les BST/HAProxy)"""
name_low = server_name.lower()
# BST = HAProxy load balancers → pas Podman, excludes standards
if "bst" in name_low or "hbst" in name_low or "bbst" in name_low or "abst" in name_low:
return False
return True
def build_yum_excludes(domain, exclude_kernel, server_name=""):
"""Construit la chaine d'excludes yum selon le domaine"""
domain_low = domain.lower()
is_flux_libre = "flux libre" in domain_low
if is_flux_libre and _is_fl_podman(server_name):
excludes = YUM_EXCLUDES_FLUX_LIBRE
else:
excludes = YUM_EXCLUDES_STD
if exclude_kernel:
excludes += " --exclude=*kernel*"
return excludes
def build_yum_command(domain, packages, exclude_kernel, dryrun, server_name="", extra_excludes=""):
"""Construit la commande yum selon le domaine"""
excludes = build_yum_excludes(domain, exclude_kernel, server_name)
if extra_excludes:
excludes += " " + extra_excludes
if dryrun:
if packages:
cmd = f"sudo yum check-update {packages} 2>/dev/null | grep -vE '^$|^Load|Updat|kB|bps|00:|^Red|^EPEL|^Sub' | grep -E '^[a-z]' | head -20"
else:
cmd = f"sudo yum check-update {excludes} 2>/dev/null | grep -vE '^$|^Load|Updat|kB|bps|00:|^Red|^EPEL|^Sub' | grep -E '^[a-z]' | head -30"
else:
if packages:
cmd = f"sudo yum update -y {packages} 2>&1"
else:
cmd = f"sudo yum update -y {excludes} 2>&1"
return cmd
# ==============================================================================
# LOGIN DIALOG
# ==============================================================================
class LoginDialog(tk.Toplevel):
def __init__(self, parent, db):
super().__init__(parent)
self.db = db
self.result = None # {"username":..., "role":...} ou None
self.title("SANEF Patch Manager - Connexion")
self.configure(bg="#1e1e2e")
self.resizable(False, False)
self.protocol("WM_DELETE_WINDOW", self._quit)
self._build()
center_window(self, 420, 340)
self.grab_set()
self.bind("<Return>", lambda e: self._login())
self.wait_window()
def _build(self):
BG = "#1e1e2e"; BG2 = "#2a2a3e"; FG = "#cdd6f4"; ACCENT = "#89b4fa"
tk.Label(self, text="SANEF PATCH MANAGER", bg="#181825", fg=ACCENT,
font=("Consolas", 14, "bold"), pady=10).pack(fill="x")
tk.Label(self, text="Authentification requise", bg=BG, fg="#6c7086",
font=("Consolas", 10)).pack(pady=(10, 15))
f = tk.Frame(self, bg=BG)
f.pack(pady=5)
tk.Label(f, text="Utilisateur :", bg=BG, fg=FG, font=("Consolas", 11),
width=14, anchor="w").grid(row=0, column=0, padx=8, pady=6)
self.user_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 11),
insertbackground=FG, width=22)
self.user_entry.grid(row=0, column=1, padx=8, pady=6)
self.user_entry.focus_set()
tk.Label(f, text="Mot de passe :", bg=BG, fg=FG, font=("Consolas", 11),
width=14, anchor="w").grid(row=1, column=0, padx=8, pady=6)
self.pwd_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 11),
insertbackground=FG, width=22, show="*")
self.pwd_entry.grid(row=1, column=1, padx=8, pady=6)
self.msg_label = tk.Label(self, text="", bg=BG, fg="#f38ba8",
font=("Consolas", 10))
self.msg_label.pack(pady=8)
bf = tk.Frame(self, bg=BG)
bf.pack(pady=10)
tk.Button(bf, text="Connexion", bg="#40a02b", fg="white",
font=("Consolas", 11, "bold"), padx=20, pady=4,
command=self._login).pack(side="left", padx=8)
tk.Button(bf, text="Quitter", bg="#313244", fg="#cdd6f4",
font=("Consolas", 10), padx=12, pady=4,
command=self._quit).pack(side="left", padx=8)
tk.Label(self, text=f"SQATM Patch Manager v{VERSION}", bg=BG, fg="#45475a",
font=("Consolas", 9)).pack(side="bottom", pady=5)
def _login(self):
username = self.user_entry.get().strip()
password = self.pwd_entry.get()
if not username or not password:
self.msg_label.configure(text="Saisir utilisateur et mot de passe")
return
user, error = self.db.authenticate(username, password)
if error:
self.msg_label.configure(text=error)
self.pwd_entry.delete(0, "end")
return
self.db.log_action(username, "LOGIN", f"role={user['role']}")
self.result = user
self.destroy()
def _quit(self):
self.result = None
self.destroy()
# ==============================================================================
# CHANGE PASSWORD DIALOG
# ==============================================================================
class ChangePasswordDialog(tk.Toplevel):
def __init__(self, parent, db, username, forced=False):
super().__init__(parent)
self._parent = parent
self.db = db
self.username = username
self.forced = forced
self.success = False
self.title("Changement de mot de passe")
self.configure(bg="#1e1e2e")
self.resizable(False, False)
if forced:
self.protocol("WM_DELETE_WINDOW", lambda: None)
self._build()
center_window(self, 420, 300, parent=parent)
self.grab_set()
self.bind("<Return>", lambda e: self._change())
self.wait_window()
def _build(self):
BG = "#1e1e2e"; BG2 = "#2a2a3e"; FG = "#cdd6f4"; ACCENT = "#89b4fa"
msg = "Premier login : vous devez changer votre mot de passe" if self.forced else "Changement de mot de passe"
tk.Label(self, text=msg, bg="#181825", fg=ACCENT,
font=("Consolas", 11, "bold"), pady=10).pack(fill="x")
f = tk.Frame(self, bg=BG)
f.pack(pady=15)
tk.Label(f, text="Nouveau mdp :", bg=BG, fg=FG, font=("Consolas", 10),
width=16, anchor="w").grid(row=0, column=0, padx=8, pady=6)
self.new_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 10),
insertbackground=FG, width=22, show="*")
self.new_entry.grid(row=0, column=1, padx=8, pady=6)
self.new_entry.focus_set()
tk.Label(f, text="Confirmer :", bg=BG, fg=FG, font=("Consolas", 10),
width=16, anchor="w").grid(row=1, column=0, padx=8, pady=6)
self.confirm_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 10),
insertbackground=FG, width=22, show="*")
self.confirm_entry.grid(row=1, column=1, padx=8, pady=6)
self.msg_label = tk.Label(self, text="", bg=BG, fg="#f38ba8",
font=("Consolas", 10))
self.msg_label.pack(pady=5)
tk.Button(self, text="Valider", bg="#40a02b", fg="white",
font=("Consolas", 11, "bold"), padx=20, pady=4,
command=self._change).pack(pady=10)
def _change(self):
p1 = self.new_entry.get()
p2 = self.confirm_entry.get()
if len(p1) < 4:
self.msg_label.configure(text="Minimum 4 caracteres")
return
if p1 != p2:
self.msg_label.configure(text="Les mots de passe ne correspondent pas")
return
self.db.change_password(self.username, p1)
self.db.log_action(self.username, "CHANGE_PASSWORD", "")
self.success = True
self.destroy()
# ==============================================================================
# DIALOGUE SETTINGS
# ==============================================================================
class SettingsDialog(tk.Toplevel):
def __init__(self, parent, settings):
super().__init__(parent)
self._parent = parent
self.title("Parametres")
self.configure(bg="#1e1e2e")
self.resizable(True, True)
self.settings = dict(settings)
self.result = None
self._build()
# Taille adaptee a l'ecran
sh = parent.winfo_screenheight()
h = min(780, int(sh * 0.85))
center_window(self, 600, h, parent=parent)
self.grab_set()
self.wait_window()
def _build(self):
BG = "#1e1e2e"; BG2 = "#2a2a3e"; FG = "#cdd6f4"; ACCENT = "#89b4fa"
# Titre fixe en haut
tk.Label(self, text="PARAMETRES", bg="#313244", fg=ACCENT,
font=("Consolas",13,"bold"), pady=8).pack(fill="x")
# Zone scrollable
canvas = tk.Canvas(self, bg=BG, highlightthickness=0)
scrollbar = ttk.Scrollbar(self, orient="vertical", command=canvas.yview)
self.scroll_frame = tk.Frame(canvas, bg=BG)
self.scroll_frame.bind("<Configure>", lambda e: canvas.configure(scrollregion=canvas.bbox("all")))
canvas.create_window((0, 0), window=self.scroll_frame, anchor="nw")
canvas.configure(yscrollcommand=scrollbar.set)
canvas.pack(side="left", fill="both", expand=True)
scrollbar.pack(side="right", fill="y")
# Scroll molette
canvas.bind_all("<MouseWheel>", lambda e: canvas.yview_scroll(int(-1*(e.delta/120)), "units"))
sf = self.scroll_frame
def row(label, key, default=""):
f = tk.Frame(sf, bg=BG)
f.pack(fill="x", padx=20, pady=4)
tk.Label(f, text=label, bg=BG, fg=FG, font=("Consolas",10),
width=22, anchor="w").pack(side="left")
var = tk.StringVar(value=self.settings.get(key, default))
setattr(self, f"var_{key}", var)
tk.Entry(f, textvariable=var, bg=BG2, fg=FG,
font=("Consolas",10), insertbackground=FG,
width=32).pack(side="left", padx=4)
return var
tk.Label(sf, text="SSH — Recette", bg=BG, fg=ACCENT,
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
row("Cle SSH principale :", "keyfile")
row("Cle SSH secondaire :", "keyfile2")
tk.Label(sf, text="CyberArk — Production", bg=BG, fg=ACCENT,
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
row("Compte CyberArk :", "cybr_user", "CYBP01336")
row("Utilisateur cible :", "target_user", "cybsecope")
row("PSMP hostname :", "psmp", "psmp.sanef.fr")
tk.Label(sf, text="Identite patcheur", bg=BG, fg=ACCENT,
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
row("Nom du patcheur :", "patcher", "")
tk.Label(sf, text="vSphere — Snapshots", bg=BG, fg=ACCENT,
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
row("Utilisateur vCenter :", "vs_user", "")
f_pwd = tk.Frame(sf, bg=BG)
f_pwd.pack(fill="x", padx=20, pady=4)
tk.Label(f_pwd, text="Mot de passe vCenter :", bg=BG, fg=FG,
font=("Consolas",10), width=22, anchor="w").pack(side="left")
self.var_vs_pwd = tk.StringVar(value=self.settings.get("_vs_pwd_session", "")) # Session only
tk.Entry(f_pwd, textvariable=self.var_vs_pwd, show="*",
bg=BG2, fg=FG, font=("Consolas",10),
insertbackground=FG, width=32).pack(side="left", padx=4)
tk.Label(f_pwd, text="(non stocke)", bg=BG, fg="#6c7086",
font=("Consolas",8)).pack(side="left", padx=4)
tk.Label(sf, text="Connexion SSH", bg=BG, fg=ACCENT,
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
row("Port SSH :", "ssh_port", "22")
row("Timeout SSH (s) :", "timeout", "20")
row("Parallelisme :", "parallelism", "3")
tk.Label(sf, text="Splunk HEC (logs)", bg=BG, fg=ACCENT,
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
self.var_splunk_enabled = tk.BooleanVar(
value=self.settings.get("splunk_enabled", "false") == "true")
tk.Checkbutton(sf, text="Activer l'envoi vers Splunk",
variable=self.var_splunk_enabled,
bg=BG, fg=FG, selectcolor=BG2, activebackground=BG,
font=("Consolas",10)).pack(anchor="w", padx=20)
row("URL HEC :", "splunk_url", "https://splunk.sanef.fr:8088")
# Token non stocke dans le JSON — session only
f_tok = tk.Frame(sf, bg=BG)
f_tok.pack(fill="x", padx=20, pady=4)
tk.Label(f_tok, text="Token HEC :", bg=BG, fg=FG,
font=("Consolas",10), width=22, anchor="w").pack(side="left")
self.var_splunk_token = tk.StringVar(value=self.settings.get("_splunk_token_session", "")) # Session only
tk.Entry(f_tok, textvariable=self.var_splunk_token, show="*",
bg=BG2, fg=FG, font=("Consolas",10),
insertbackground=FG, width=32).pack(side="left", padx=4)
tk.Label(f_tok, text="(non stocke)", bg=BG, fg="#6c7086",
font=("Consolas",8)).pack(side="left", padx=4)
row("Index :", "splunk_index", "main")
row("Proxy (optionnel) :", "proxy_url", "http://proxy.sanef.fr:8080")
tk.Label(sf, text="Notifications Teams", bg=BG, fg=ACCENT,
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
row("Dossier SharePoint :", "sharepoint_notif_path", "")
tk.Label(sf, text="Chemin local du dossier SharePoint synchro (ex: C:\\Users\\xxx\\sanefgroupe...\\notifications)",
bg=BG, fg="#6c7086", font=("Consolas",7)).pack(anchor="w", padx=20)
tk.Label(sf, text="Logs réseau", bg=BG, fg=ACCENT,
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
row("Dossier log réseau :", "network_log_path", "")
tk.Label(sf, text="Partage réseau pour les logs (ex: \\\\serveur\\share\\logs\\patching)",
bg=BG, fg="#6c7086", font=("Consolas",7)).pack(anchor="w", padx=20)
# Boutons en bas du scroll
bf = tk.Frame(sf, bg=BG)
bf.pack(pady=16)
tk.Button(bf, text="Sauvegarder", bg="#40a02b", fg="white",
font=("Consolas",11,"bold"), padx=12, pady=6,
command=self._save).pack(side="left", padx=8)
tk.Button(bf, text="Annuler", bg="#313244", fg="#cdd6f4",
font=("Consolas",10), padx=12, pady=6,
command=self.destroy).pack(side="left", padx=8)
def _save(self):
keys = ["keyfile","keyfile2","cybr_user","target_user","psmp","ssh_port","timeout","parallelism",
"patcher","vs_user","splunk_url","splunk_token","splunk_index","proxy_url",
"sharepoint_notif_path","network_log_path"]
for k in keys:
var = getattr(self, f"var_{k}", None)
if var:
val = var.get().strip()
if k in ("timeout","parallelism","ssh_port"):
try: val = int(val)
except: val = DEFAULT_SETTINGS.get(k, 22 if k == "ssh_port" else 20)
self.settings[k] = val
# Splunk enabled (stocke dans JSON — pas sensible)
if hasattr(self, "var_splunk_enabled"):
self.settings["splunk_enabled"] = "true" if self.var_splunk_enabled.get() else "false"
# vCenter pwd et Splunk token : session only, jamais dans le JSON
if hasattr(self, "var_vs_pwd"):
self.settings["_vs_pwd_session"] = self.var_vs_pwd.get()
if hasattr(self, "var_splunk_token"):
self.settings["_splunk_token_session"] = self.var_splunk_token.get()
self.result = self.settings
# Sauvegarder dans JSON SANS les secrets
to_save = {k: v for k, v in self.settings.items() if not k.startswith("_")}
save_settings(to_save)
self.destroy()
# ==============================================================================
# DIALOGUE CONFIRMATION COMMANDE YUM
# ==============================================================================
class YumConfirmDialog(tk.Toplevel):
def __init__(self, parent, server, cmd):
super().__init__(parent)
self.title("Confirmation commande patch")
self.configure(bg="#1e1e2e")
self.result = None # "apply", "apply_all", "cancel"
self.cmd_var = tk.StringVar(value=cmd)
self._build(server, cmd)
center_window(self, 680, 340, parent=parent)
self.grab_set()
self.wait_window()
def _build(self, server, cmd):
BG = "#1e1e2e"; FG = "#cdd6f4"; ACCENT = "#89b4fa"; YELLOW = "#f9e2af"
tk.Label(self, text="⚠️ CONFIRMATION COMMANDE PATCH",
bg="#313244", fg=ACCENT, font=("Consolas",12,"bold"),
pady=8).pack(fill="x")
tk.Label(self, text=f"Serveur : {server}",
bg=BG, fg=YELLOW, font=("Consolas",11)).pack(anchor="w", padx=16, pady=6)
tk.Label(self, text="Commande à exécuter :",
bg=BG, fg=FG, font=("Consolas",10)).pack(anchor="w", padx=16)
cmd_entry = tk.Text(self, height=5, bg="#11111b", fg="#a6e3a1",
font=("Consolas",10), wrap="word",
insertbackground=FG)
cmd_entry.insert("1.0", cmd)
cmd_entry.pack(fill="x", padx=16, pady=4)
self._cmd_text = cmd_entry
bf = tk.Frame(self, bg=BG)
bf.pack(pady=12)
tk.Button(bf, text="▶ Appliquer",
bg="#40a02b", fg="white", font=("Consolas",11,"bold"),
padx=10, pady=6,
command=lambda: self._set("apply")).pack(side="left", padx=6)
tk.Button(bf, text="▶▶ Appliquer à TOUS",
bg="#1e66f5", fg="white", font=("Consolas",11,"bold"),
padx=10, pady=6,
command=lambda: self._set("apply_all")).pack(side="left", padx=6)
tk.Button(bf, text="✗ Annuler ce serveur",
bg="#d20f39", fg="white", font=("Consolas",10),
padx=10, pady=6,
command=lambda: self._set("cancel")).pack(side="left", padx=6)
def _set(self, val):
self.result = val
self.final_cmd = self._cmd_text.get("1.0", "end").strip()
self.destroy()
# ==============================================================================
# APPLICATION PRINCIPALE
# ==============================================================================
class PatchManagerV2:
def __init__(self, root, db, current_user):
self.root = root
self.db = db
self.current_user = current_user # {"username":..., "role":...}
self.root.title(f"SANEF Linux Patch Manager v{VERSION}{current_user['username']} ({current_user['role']})")
self.root.configure(bg="#1e1e2e")
# Plein ecran sur le moniteur ou se trouve la fenetre
try:
center_window(root, root.winfo_screenwidth(), root.winfo_screenheight(), parent=root)
root.after(100, lambda: root.state("zoomed"))
except Exception:
root.state("zoomed")
self.settings = load_settings()
self.servers = []
self.results = []
self.running = False
self.pkey = None
self.pkey2 = None
self.cyb_pwd = None
self.apply_all_cmd = False
self.is_current_week = True # True = semaine courante (patch autorise)
self._build_ui()
self._reload_keys()
self.audit("APP_START", "")
# Verifier si des interventions Teams sont restees ouvertes (crash precedent)
self.root.after(500, self._teams_check_crash_recovery)
# ==========================================================================
# COULEURS
# ==========================================================================
BG = "#1e1e2e"
BG2 = "#2a2a3e"
FG = "#cdd6f4"
ACCENT = "#89b4fa"
GREEN = "#a6e3a1"
RED = "#f38ba8"
YELLOW = "#f9e2af"
ORANGE = "#fab387"
BTN = "#313244"
# ==========================================================================
# AUDIT HELPER
# ==========================================================================
def audit(self, action, details=""):
username = self.current_user["username"]
self.db.log_action(username, action, details)
# Envoi Splunk en arriere-plan (non bloquant)
threading.Thread(target=send_to_splunk, args=(self.settings, {
"action": action, "user": username, "details": details,
"timestamp": datetime.now().isoformat(),
}), daemon=True).start()
# Log réseau
server = details.split("|")[0].strip() if "|" in details else ""
threading.Thread(target=self._write_network_log,
args=(action, server, details), daemon=True).start()
# ==========================================================================
# NOTIFICATIONS TEAMS via SharePoint / Power Automate
# ==========================================================================
# Sous-dossiers SharePoint pour le routing
TEAMS_ROUTES = {
"dsi_general": "reboot",
"fl_prod": "fl_prod",
"fl_bst": "fl_bst",
"fl_horsprod": "fl_horsprod",
"peage": "peage",
"lan_delcour": "lan_delcour",
}
def _get_teams_route(self, server_info, msg_type):
"""Determine le sous-dossier SharePoint pour le message Teams.
msg_type: 'debut', 'fin', 'reboot'"""
# Reboot → toujours DSI General
if msg_type == "reboot":
return "dsi_general"
domain = server_info.get("domain", "").lower()
env = server_info.get("env", "").lower()
name = server_info.get("server", "").lower()
responsable = server_info.get("valideur", "").lower() # Responsable Domaine DTS
# Flux Libre
if "flux libre" in domain:
if "bst" in name:
return "fl_bst"
elif env in ("production",):
return "fl_prod"
else:
return "fl_horsprod"
# Peage
if "peage" in domain or "péage" in domain:
return "peage"
# LAN + responsable Laurent DELCOUR
if "delcour" in responsable:
return "lan_delcour"
# Default: pas de notification pour les autres
return None
def _format_teams_msg(self, server_name, msg_type, intervenant):
"""Formate le message Teams"""
nom = intervenant.capitalize() if intervenant else "SecOps"
if msg_type == "debut":
return f"[SECOPS] : Intervenant({nom}) => Debut d'intervention sur {server_name}"
elif msg_type == "reboot":
return f"[SECOPS] : Intervenant({nom}) => Reboot suite MAJ de {server_name}"
elif msg_type == "fin":
return f"[SECOPS] : Intervenant({nom}) => Fin d'intervention sur {server_name}"
elif msg_type == "annulation":
return f"[SECOPS] : Intervenant({nom}) => Annulation intervention sur {server_name}"
return ""
# Fichier d'etat Teams persistant (crash recovery)
TEAMS_STATE_FILE = os.path.join(os.path.expanduser("~"), ".patch_manager_teams_state.json")
def _teams_save_state(self):
"""Sauvegarde l'etat des notifications Teams en cours (pour crash recovery)"""
pending = []
for s in getattr(self, "servers", []):
if s.get("_teams_debut_sent") and not s.get("_teams_fin_sent"):
pending.append({
"server": s.get("server", ""),
"domain": s.get("domain", ""),
"env": s.get("env", ""),
"valideur": s.get("valideur", ""),
"timestamp": datetime.now().isoformat(),
})
try:
if pending:
with open(self.TEAMS_STATE_FILE, "w") as f:
json.dump(pending, f)
elif os.path.exists(self.TEAMS_STATE_FILE):
os.remove(self.TEAMS_STATE_FILE)
except Exception:
pass
def _teams_check_crash_recovery(self):
"""Au demarrage, verifie si des interventions sont restees ouvertes (crash/bug)"""
if not os.path.exists(self.TEAMS_STATE_FILE):
return
try:
with open(self.TEAMS_STATE_FILE, "r") as f:
pending = json.load(f)
if not pending:
return
names = "\n".join(p["server"] for p in pending[:10])
if messagebox.askyesno(
"Interventions non terminees",
f"L'application a detecte {len(pending)} intervention(s) sans message de fin :\n\n"
f"{names}\n\n"
f"Envoyer un message d'annulation Teams pour ces serveurs ?",
icon="warning"):
for p in pending:
self._send_teams_notification(p, "annulation")
os.remove(self.TEAMS_STATE_FILE)
except Exception:
try:
os.remove(self.TEAMS_STATE_FILE)
except Exception:
pass
def _send_teams_notification(self, server_info, msg_type):
"""Envoie une notification Teams via fichier SharePoint (non bloquant)"""
if hasattr(self, "dryrun_var") and self.dryrun_var.get():
return # Pas de notification en dry run
route = self._get_teams_route(server_info, msg_type)
if not route:
return # Pas de notification pour ce serveur
sp_base = self.settings.get("sharepoint_notif_path", "").strip()
if not sp_base:
return # SharePoint non configure
intervenant = self.settings.get("patcher", "").strip() or self.current_user.get("username", "SecOps")
message = self._format_teams_msg(server_info.get("server", ""), msg_type, intervenant)
if not message:
return
def _write_notif():
try:
# Ecrire le fichier Teams (message seul)
notif_dir = os.path.join(sp_base, route)
os.makedirs(notif_dir, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
fname = f"{msg_type}_{server_info.get('server', 'unknown')}_{ts}.txt"
fpath = os.path.join(notif_dir, fname)
with open(fpath, "w", encoding="utf-8") as f:
f.write(message)
except Exception:
pass
# Ecrire dans le log réseau (avec métadonnées)
self._write_network_log(msg_type, server_info.get("server", ""), message)
threading.Thread(target=_write_notif, daemon=True).start()
# Mettre a jour l'etat persistant
if msg_type == "debut":
self._teams_save_state()
elif msg_type in ("fin", "annulation"):
self._teams_save_state()
def _write_network_log(self, action, server, message=""):
"""Ecrit une ligne de log sur le partage réseau (avec métadonnées utilisateur)"""
log_path = self.settings.get("network_log_path", "").strip()
if not log_path:
return
try:
os.makedirs(log_path, exist_ok=True)
log_file = os.path.join(log_path, f"patch_manager_{date.today():%Y%m%d}.log")
app_user = self.current_user.get("username", "unknown")
win_user = os.environ.get("USERNAME", os.environ.get("USER", "unknown"))
hostname = socket.gethostname()
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"{ts} | {action:12s} | {app_user} | {win_user}@{hostname} | {server} | {message}\n"
with open(log_file, "a", encoding="utf-8") as f:
f.write(line)
except Exception:
pass # Ne jamais bloquer le patching
# ==========================================================================
# BUILD UI
# ==========================================================================
def _build_ui(self):
BG=self.BG; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
# Style ttk
style = ttk.Style()
style.theme_use("clam")
style.configure("TFrame", background=self.BG)
style.configure("TLabel", background=self.BG, foreground=self.FG, font=("Consolas",10))
style.configure("Treeview", background=self.BG2, foreground=self.FG,
fieldbackground=self.BG2, font=("Consolas",9), rowheight=22)
style.configure("Treeview.Heading", background=self.BTN, foreground=self.ACCENT,
font=("Consolas",10,"bold"))
style.map("Treeview", background=[("selected","#45475a")])
style.configure("TCombobox", fieldbackground=self.BG2, foreground=self.FG,
background=self.BG2, font=("Consolas",10))
style.configure("TCheckbutton",background=self.BG, foreground=self.FG, font=("Consolas",10))
# ── Barre titre ────────────────────────────────────────────────────────
tb = tk.Frame(self.root, bg="#181825", pady=6)
tb.pack(fill="x")
tk.Label(tb, text=f"⚡ SANEF LINUX PATCH MANAGER v{VERSION}",
bg="#181825", fg=ACCENT, font=("Consolas",14,"bold")).pack(side="left", padx=12)
tk.Label(tb, text=f"Semaine {date.today().isocalendar()[1]} | {date.today():%d/%m/%Y}",
bg="#181825", fg="#6c7086", font=("Consolas",10)).pack(side="left", padx=12)
tk.Button(tb, text="A propos", bg=BTN, fg="#6c7086",
font=("Consolas",9), pady=2,
command=self._show_about).pack(side="right", padx=4)
tk.Button(tb, text="Mon MDP", bg=BTN, fg="#fab387",
font=("Consolas",9), pady=2,
command=self._change_my_password).pack(side="right", padx=4)
tk.Button(tb, text="Settings", bg=BTN, fg=FG,
font=("Consolas",10), pady=2,
command=self._open_settings).pack(side="right", padx=12)
# ── Notebook (onglets) ─────────────────────────────────────────────────
nb_style = ttk.Style()
nb_style.configure("TNotebook", background=self.BG, borderwidth=0)
nb_style.configure("TNotebook.Tab", background=self.BTN, foreground=self.FG,
font=("Consolas",10,"bold"), padding=[12,4])
nb_style.map("TNotebook.Tab",
background=[("selected","#313244")],
foreground=[("selected",self.ACCENT)])
self.nb = ttk.Notebook(self.root)
self.nb.pack(fill="both", expand=True, padx=6, pady=4)
# Onglets
self.tab_servers = tk.Frame(self.nb, bg=BG)
self.tab_patch = tk.Frame(self.nb, bg=BG)
self.tab_post = tk.Frame(self.nb, bg=BG)
self.tab_report = tk.Frame(self.nb, bg=BG)
self.tab_audit = tk.Frame(self.nb, bg=BG)
self.nb.add(self.tab_servers, text="1. Selection serveurs")
self.nb.add(self.tab_patch, text="2. Patch")
self.nb.add(self.tab_post, text="3. Post-patching")
self.nb.add(self.tab_report, text="4. Rapport")
if self.current_user["role"] == "admin":
self.nb.add(self.tab_audit, text="5. Audit")
self.tab_users = tk.Frame(self.nb, bg=BG)
self.nb.add(self.tab_users, text="6. Utilisateurs")
self._build_tab_servers()
self._build_tab_patch()
self._build_tab_post()
self._build_tab_report()
if self.current_user["role"] == "admin":
self._build_tab_audit()
self._build_tab_users()
# Viewer = onglets patch/post desactives
if self.current_user["role"] == "viewer":
self.nb.tab(self.tab_patch, state="disabled")
self.nb.tab(self.tab_post, state="disabled")
# ==========================================================================
# ONGLET 1 — SÉLECTION SERVEURS
# ==========================================================================
def _build_tab_servers(self):
tab = self.tab_servers
BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
# ── Ligne 1 : Excel ───────────────────────────────────────────────────
ef = tk.Frame(tab, bg=BG)
ef.pack(fill="x", padx=10, pady=6)
tk.Label(ef, text="📊 Fichier Excel :", bg=BG, fg=FG,
font=("Consolas",10)).pack(side="left")
self.excel_var = tk.StringVar(value=self.settings.get("excel_file",""))
tk.Entry(ef, textvariable=self.excel_var, bg=BG2, fg=FG,
font=("Consolas",10), insertbackground=FG,
width=55).pack(side="left", padx=6)
tk.Button(ef, text="Parcourir", bg=BTN, fg=FG, font=("Consolas",9),
command=self._browse_excel).pack(side="left", padx=2)
# Feuille
tk.Label(ef, text="Feuille :", bg=BG, fg=FG,
font=("Consolas",10)).pack(side="left", padx=(12,2))
self.sheet_var = tk.StringVar()
self.sheet_cb = ttk.Combobox(ef, textvariable=self.sheet_var,
state="readonly", width=10)
self.sheet_cb.pack(side="left", padx=2)
self.sheet_cb.bind("<<ComboboxSelected>>", lambda e: self._load_sheet())
# Navigation rapide semaines
tk.Button(ef, text="", bg=BTN, fg=FG, font=("Consolas",10),
width=2, command=self._prev_week).pack(side="left", padx=1)
tk.Button(ef, text="", bg=BTN, fg=FG, font=("Consolas",10),
width=2, command=self._next_week).pack(side="left", padx=1)
tk.Label(ef, text="S. courante", bg=BG, fg="#6c7086",
font=("Consolas",9)).pack(side="left", padx=2)
tk.Button(ef, text="", bg=BTN, fg=self.ACCENT, font=("Consolas",10),
width=2, command=self._goto_current_week).pack(side="left", padx=1)
tk.Button(ef, text="Charger", bg="#1e66f5", fg="white",
font=("Consolas",10,"bold"), pady=3,
command=self._load_excel).pack(side="left", padx=8)
# Indicateur semaine courante/visu
self.week_mode_lbl = tk.Label(ef, text="", bg=self.BG,
font=("Consolas",10,"bold"))
self.week_mode_lbl.pack(side="left", padx=8)
# ── Ligne 2 : Filtres ─────────────────────────────────────────────────
ff = tk.Frame(tab, bg=BG)
ff.pack(fill="x", padx=10, pady=4)
tk.Label(ff, text="Filtres :", bg=BG, fg=ACCENT,
font=("Consolas",10,"bold")).pack(side="left")
# Date du jour
self.filter_today_var = tk.BooleanVar(value=False)
tk.Checkbutton(ff, text="📅 Aujourd'hui uniquement",
variable=self.filter_today_var,
bg=BG, fg=self.YELLOW, selectcolor=BG2,
activebackground=BG,
font=("Consolas",10)).pack(side="left", padx=8)
# Filtre Intervenant — visible uniquement pour admin
self.filter_patcher_var = tk.StringVar(value="Tous")
if self.current_user["role"] == "admin":
tk.Label(ff, text="Intervenant :", bg=BG, fg=self.YELLOW,
font=("Consolas",10,"bold")).pack(side="left", padx=(8,2))
self.filter_patcher_cb = ttk.Combobox(ff, textvariable=self.filter_patcher_var,
state="readonly", width=14)
self.filter_patcher_cb.pack(side="left", padx=2)
self.filter_patcher_cb.bind("<<ComboboxSelected>>", lambda e: self._apply_filters())
else:
self.filter_patcher_cb = None # Non-admin : pas de filtre intervenant
for label, attr in [("Env", "filter_env"), ("Domaine", "filter_domain"),
("Application", "filter_app")]:
tk.Label(ff, text=f"{label}:", bg=BG, fg=FG,
font=("Consolas",10)).pack(side="left", padx=(8,2))
var = tk.StringVar(value="Tous")
setattr(self, f"{attr}_var", var)
cb = ttk.Combobox(ff, textvariable=var, state="readonly", width=14)
setattr(self, f"{attr}_cb", cb)
cb.pack(side="left", padx=2)
tk.Button(ff, text="Rechercher", bg=BTN, fg=ACCENT,
font=("Consolas",10,"bold"), pady=3,
command=self._apply_filters).pack(side="left", padx=6)
# ── Ligne 3 : Actions sélection ───────────────────────────────────────
af = tk.Frame(tab, bg=BG)
af.pack(fill="x", padx=10, pady=2)
for txt, cmd in [("✓ Tout (accordé)", self._select_all_ok),
("✗ Aucun", self._select_none),
("↕ Inverser", self._invert)]:
tk.Button(af, text=txt, bg=BTN, fg=FG, font=("Consolas",9),
command=cmd).pack(side="left", padx=3)
self.count_lbl = tk.Label(af, text="", bg=BG, fg=self.YELLOW,
font=("Consolas",9))
self.count_lbl.pack(side="right", padx=10)
# ── Treeview serveurs ─────────────────────────────────────────────────
tree_frame = tk.Frame(tab, bg=BG)
tree_frame.pack(fill="both", expand=True, padx=10, pady=4)
cols = ("sel","accord","serveur","intervenant","env","domaine","app","date_patch","snap","statut")
self.srv_tree = ttk.Treeview(tree_frame, columns=cols,
show="headings", height=22)
hdrs = {"sel":"","accord":"Accord","serveur":"Serveur",
"intervenant":"Intervenant","env":"Env",
"domaine":"Domaine","app":"Application","date_patch":"Date patch",
"snap":"Snapshot","statut":"Statut"}
widths = {"sel":30,"accord":70,"serveur":180,"intervenant":90,"env":80,"domaine":120,
"app":150,"date_patch":90,"snap":80,"statut":100}
for c in cols:
self.srv_tree.heading(c, text=hdrs[c])
self.srv_tree.column(c, width=widths[c], anchor="center" if c in ("sel","accord","snap","statut") else "w")
self.srv_tree.tag_configure("no_accord", foreground="#585b70", background="#1e1e2e")
self.srv_tree.tag_configure("ok", foreground=self.GREEN)
self.srv_tree.tag_configure("warn", foreground=self.YELLOW)
self.srv_tree.tag_configure("ko", foreground=self.RED)
self.srv_tree.tag_configure("selected", foreground=self.FG, background="#313244")
self.srv_tree.tag_configure("normal", foreground=self.FG)
self.srv_tree.tag_configure("already_patched",foreground="#1e1e2e", background="#40a02b")
vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.srv_tree.yview)
hsb = ttk.Scrollbar(tree_frame, orient="horizontal", command=self.srv_tree.xview)
self.srv_tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
self.srv_tree.pack(side="left", fill="both", expand=True)
vsb.pack(side="right", fill="y")
hsb.pack(side="bottom", fill="x")
self.srv_tree.bind("<ButtonRelease-1>", self._toggle_srv)
# ── Boutons prerequis + snapshot ──────────────────────────────────────
prereq_frame = tk.Frame(tab, bg=BG)
prereq_frame.pack(fill="x", padx=10, pady=4)
self.prereq_btn = tk.Button(prereq_frame, text="Verifier prerequis",
bg="#e64553", fg="white", font=("Consolas",12,"bold"),
pady=6, command=self._check_prerequisites)
self.prereq_btn.pack(side="left", padx=4)
self.prereq_status = tk.Label(prereq_frame, text="", bg=BG, fg=self.FG,
font=("Consolas",10))
self.prereq_status.pack(side="left", padx=12)
self.snap_btn = tk.Button(prereq_frame, text="Prendre snapshot vSphere",
bg="#313244", fg="#6c7086", font=("Consolas",10),
pady=4, state="disabled", command=self._take_snapshots)
self.snap_btn.pack(side="left", padx=4)
tk.Button(prereq_frame,
text="▶ Aller au Patch →",
bg="#40a02b", fg="white", font=("Consolas",11,"bold"),
pady=5,
command=lambda: self.nb.select(self.tab_patch)).pack(side="right", padx=4)
# ==========================================================================
# ONGLET 2 — PATCH
# ==========================================================================
def _build_tab_patch(self):
tab = self.tab_patch
BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
top = tk.Frame(tab, bg=BG)
top.pack(fill="both", expand=True, padx=10, pady=8)
# Colonne gauche : options (scrollable)
left_canvas = tk.Canvas(top, bg=BG, highlightthickness=0, width=380)
left_sb = ttk.Scrollbar(top, orient="vertical", command=left_canvas.yview)
left = tk.Frame(left_canvas, bg=BG)
left.bind("<Configure>", lambda e: left_canvas.configure(scrollregion=left_canvas.bbox("all")))
left_canvas.create_window((0, 0), window=left, anchor="nw")
left_canvas.configure(yscrollcommand=left_sb.set)
left_canvas.pack(side="left", fill="y")
left_sb.pack(side="left", fill="y")
left_canvas.bind_all("<MouseWheel>", lambda e: left_canvas.yview_scroll(int(-1*(e.delta/120)), "units"))
tk.Label(left, text="OPTIONS PATCH", bg=BTN, fg=ACCENT,
font=("Consolas",11,"bold"), padx=8, pady=4).pack(fill="x")
# Preset packages
tk.Label(left, text="Preset packages A PATCHER :", bg=BG, fg=FG,
font=("Consolas",10)).pack(anchor="w", pady=(8,2))
self.preset_var = tk.StringVar(value=list(PRESET_PACKAGES.keys())[0])
preset_cb = ttk.Combobox(left, textvariable=self.preset_var,
values=list(PRESET_PACKAGES.keys()),
state="readonly", width=30)
preset_cb.pack(anchor="w")
preset_cb.bind("<<ComboboxSelected>>", self._on_preset)
# Champ packages a patcher
tk.Label(left, text="Packages specifiques a PATCHER :", bg=BG, fg=FG,
font=("Consolas",10)).pack(anchor="w", pady=(8,2))
self.packages_var = tk.StringVar(value="")
self.packages_entry = tk.Entry(left, textvariable=self.packages_var, bg=BG2, fg=FG,
font=("Consolas",10), insertbackground=FG, width=32)
self.packages_entry.pack(anchor="w")
tk.Label(left, text="Ex: gnutls libsoup rhc",
bg=BG, fg="#6c7086", font=("Consolas",9)).pack(anchor="w")
# Champ packages a exclure
tk.Label(left, text="Packages specifiques a EXCLURE :", bg=BG, fg=FG,
font=("Consolas",10)).pack(anchor="w", pady=(8,2))
self.custom_excludes_var = tk.StringVar(value="")
self.custom_excludes_entry = tk.Entry(left, textvariable=self.custom_excludes_var, bg=BG2, fg=FG,
font=("Consolas",10), insertbackground=FG, width=32)
self.custom_excludes_entry.pack(anchor="w")
tk.Label(left, text="Ex: kernel* podman* docker*",
bg=BG, fg="#6c7086", font=("Consolas",9)).pack(anchor="w")
# Commande finale editable
tk.Label(left, text="Commande finale (editable) :", bg=BG, fg=self.YELLOW,
font=("Consolas",10,"bold")).pack(anchor="w", pady=(10,2))
self.final_cmd_var = tk.StringVar(value="")
self.final_cmd_entry = tk.Entry(left, textvariable=self.final_cmd_var,
bg="#181825", fg=self.YELLOW, font=("Consolas",9),
insertbackground=self.YELLOW, width=48)
self.final_cmd_entry.pack(anchor="w")
tk.Button(left, text="Generer la commande", bg=BTN, fg=FG,
font=("Consolas",9), command=self._preview_cmd).pack(anchor="w", pady=(4,0))
# Aide
help_frame = tk.Frame(left, bg="#181825", pady=4, padx=8)
help_frame.pack(fill="x", pady=(6,0))
tk.Label(help_frame, text="Preset = commande predeterminee",
bg="#181825", fg="#a6e3a1", font=("Consolas",8)).pack(anchor="w")
tk.Label(help_frame, text="Personnalise = packages a patcher ET/OU a exclure",
bg="#181825", fg="#f9e2af", font=("Consolas",8)).pack(anchor="w")
tk.Label(help_frame, text="La commande finale est toujours editable avant GO",
bg="#181825", fg="#6c7086", font=("Consolas",8)).pack(anchor="w")
# Options
self.exclude_kernel_var = tk.BooleanVar(value=True)
self.dryrun_var = tk.BooleanVar(value=True)
tk.Checkbutton(left, text="☑ Exclure kernel (recommandé)",
variable=self.exclude_kernel_var,
bg=BG, fg=self.YELLOW, selectcolor=BG2,
activebackground=BG,
font=("Consolas",10,"bold")).pack(anchor="w", pady=(10,2))
tk.Checkbutton(left, text="🔍 Dry Run (simulation)",
variable=self.dryrun_var,
bg=BG, fg=self.ORANGE, selectcolor=BG2,
activebackground=BG,
font=("Consolas",10,"bold")).pack(anchor="w", pady=2)
# Mot de passe CyberArk
tk.Label(left, text="Mot de passe CyberArk (prod) :",
bg=BG, fg=FG, font=("Consolas",10)).pack(anchor="w", pady=(12,2))
self.cybpwd_var = tk.StringVar()
tk.Entry(left, textvariable=self.cybpwd_var, show="",
bg=BG2, fg=FG, font=("Consolas",10),
insertbackground=FG, width=28).pack(anchor="w")
# GO button
self.go_btn = tk.Button(left, text="🚀 GO PATCH",
bg="#d20f39", fg="white",
font=("Consolas",16,"bold"),
pady=12, padx=20,
command=self._go_patch)
self.go_btn.pack(fill="x", pady=16)
self.stop_btn = tk.Button(left, text="⏹ STOP",
bg="#6c7086", fg="white",
font=("Consolas",11,"bold"),
pady=6, state="disabled",
command=self._stop)
self.stop_btn.pack(fill="x")
# Colonne droite : log temps réel
right = tk.Frame(top, bg=BG)
right.pack(side="left", fill="both", expand=True)
tk.Label(right, text="DÉROULÉ EN TEMPS RÉEL",
bg=BTN, fg=ACCENT,
font=("Consolas",11,"bold"), pady=4).pack(fill="x")
self.patch_log = scrolledtext.ScrolledText(
right, height=28, bg="#11111b", fg=FG,
font=("Consolas",9), insertbackground=FG, state="disabled")
self.patch_log.pack(fill="both", expand=True, pady=4)
for tag, color in [("ok",self.GREEN),("ko",self.RED),
("warn",self.YELLOW),("info",ACCENT),
("cmd","#cba6f7")]:
self.patch_log.tag_configure(tag, foreground=color)
# Barre progression
pf = tk.Frame(tab, bg=BG)
pf.pack(fill="x", padx=10, pady=4)
self.prog_lbl = tk.Label(pf, text="En attente...",
bg=BG, fg=FG, font=("Consolas",9))
self.prog_lbl.pack(side="left", padx=4)
self.prog_var = tk.DoubleVar()
ttk.Progressbar(pf, variable=self.prog_var,
maximum=100, length=600).pack(side="left", fill="x", expand=True, padx=4)
# ==========================================================================
# ONGLET 3 — POST-PATCHING
# ==========================================================================
def _build_tab_post(self):
tab = self.tab_post
BG=self.BG; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
tk.Label(tab, text="POST-PATCHING", bg=BTN, fg=ACCENT,
font=("Consolas",12,"bold"), pady=6).pack(fill="x", padx=10, pady=8)
bf = tk.Frame(tab, bg=BG)
bf.pack(fill="x", padx=10, pady=4)
tk.Button(bf, text="🔍 Check post-patching",
bg="#1e66f5", fg="white", font=("Consolas",11,"bold"),
pady=6, command=self._post_patch_check).pack(side="left", padx=4)
tk.Button(bf, text="🗑 Supprimer anciens kernels",
bg="#e64553", fg="white", font=("Consolas",11,"bold"),
pady=6, command=self._remove_old_kernels).pack(side="left", padx=4)
tk.Button(bf, text="↩ Undo yum",
bg="#fe640b", fg="white", font=("Consolas",11,"bold"),
pady=6, command=self._undo_yum).pack(side="left", padx=4)
tk.Button(bf, text="🔄 Reboot serveurs sélectionnés",
bg="#8839ef", fg="white", font=("Consolas",11,"bold"),
pady=6, command=self._reboot_servers).pack(side="left", padx=4)
tk.Button(bf, text="🟩 Marquer patchés dans Excel",
bg="#40a02b", fg="white", font=("Consolas",11,"bold"),
pady=6, command=self._ask_mark_patched).pack(side="left", padx=4)
# Deuxième rangée de boutons
bf2 = tk.Frame(tab, bg=BG)
bf2.pack(fill="x", padx=10, pady=2)
tk.Button(bf2, text="🗑 Supprimer snapshots > 3 jours",
bg="#e64553", fg="white", font=("Consolas",11,"bold"),
pady=6, command=self._delete_old_snapshots).pack(side="left", padx=4)
self.post_log = scrolledtext.ScrolledText(
tab, bg="#11111b", fg=FG,
font=("Consolas",9), insertbackground=FG, state="disabled")
self.post_log.pack(fill="both", expand=True, padx=10, pady=4)
for tag, color in [("ok",self.GREEN),("ko",self.RED),
("warn",self.YELLOW),("info",ACCENT)]:
self.post_log.tag_configure(tag, foreground=color)
# ==========================================================================
# ONGLET 4 — RAPPORT PATCHING (style dashboard)
# ==========================================================================
def _build_tab_report(self):
tab = self.tab_report
BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
# ── Barre titre + boutons ─────────────────────────────────────────────
hf = tk.Frame(tab, bg=BTN, pady=6)
hf.pack(fill="x", padx=0, pady=0)
tk.Label(hf, text="RAPPORT DE PATCHING",
bg=BTN, fg=ACCENT, font=("Consolas",12,"bold")).pack(side="left", padx=12)
bf = tk.Frame(hf, bg=BTN)
bf.pack(side="right", padx=8)
tk.Button(bf, text="Generer", bg="#40a02b", fg="white",
font=("Consolas",10,"bold"), pady=3,
command=self._generate_report).pack(side="left", padx=4)
tk.Button(bf, text="CSV", bg=BTN, fg=FG,
font=("Consolas",10), pady=3,
command=self._export_csv).pack(side="left", padx=4)
tk.Button(bf, text="DokuWiki", bg=BTN, fg=FG,
font=("Consolas",10), pady=3,
command=self._export_dokuwiki).pack(side="left", padx=4)
# ── Widgets KPI (ligne de compteurs) ──────────────────────────────────
kpi_frame = tk.Frame(tab, bg=BG)
kpi_frame.pack(fill="x", padx=10, pady=8)
# Créer 6 widgets KPI
self.kpi_widgets = {}
kpi_defs = [
("total", "Total traités", "#313244", FG),
("ok", "Connexion OK", "#1a4a2e", self.GREEN),
("patched", "Patchés", "#1a4a2e", self.GREEN),
("uptodate", "Déjà à jour", "#2a2a3e", ACCENT),
("reboot", "Reboot requis", "#4a3000", self.YELLOW),
("ko", "KO / Auth", "#4a1a1a", self.RED),
]
for key, label, bg_col, fg_col in kpi_defs:
frame = tk.Frame(kpi_frame, bg=bg_col, padx=16, pady=10,
relief="flat", bd=0)
frame.pack(side="left", fill="x", expand=True, padx=4)
# Valeur
val_lbl = tk.Label(frame, text="", bg=bg_col, fg=fg_col,
font=("Consolas",24,"bold"))
val_lbl.pack()
# Label
tk.Label(frame, text=label, bg=bg_col, fg=fg_col,
font=("Consolas",9)).pack()
self.kpi_widgets[key] = val_lbl
# ── Tableau détaillé résultats ─────────────────────────────────────────
tk.Label(tab, text="DETAIL PAR SERVEUR",
bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=12, pady=(4,2))
tree_frame = tk.Frame(tab, bg=BG)
tree_frame.pack(fill="both", expand=True, padx=10, pady=4)
rcols = ("serveur","env","domaine","accord","snap","connexion","patch","reboot","detail")
self.report_tree = ttk.Treeview(tree_frame, columns=rcols,
show="headings", height=14)
rhdrs = {
"serveur": "Serveur",
"env": "Env",
"domaine": "Domaine",
"accord": "Accord",
"snap": "Snapshot",
"connexion": "Connexion",
"patch": "Patch",
"reboot": "Reboot",
"detail": "Detail packages",
}
rwidths = {
"serveur":200,"env":80,"domaine":100,"accord":70,
"snap":80,"connexion":90,"patch":100,"reboot":70,"detail":280
}
for c in rcols:
self.report_tree.heading(c, text=rhdrs[c])
self.report_tree.column(c, width=rwidths[c],
anchor="center" if c not in ("serveur","detail") else "w")
self.report_tree.tag_configure("ok", foreground=self.GREEN)
self.report_tree.tag_configure("ko", foreground=self.RED)
self.report_tree.tag_configure("warn", foreground=self.YELLOW)
self.report_tree.tag_configure("patched", foreground=self.GREEN, background="#1a2e1a")
self.report_tree.tag_configure("reboot", foreground=self.YELLOW, background="#2e2a00")
self.report_tree.tag_configure("uptodate",foreground=ACCENT)
vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.report_tree.yview)
hsb = ttk.Scrollbar(tree_frame, orient="horizontal", command=self.report_tree.xview)
self.report_tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
self.report_tree.pack(side="left", fill="both", expand=True)
vsb.pack(side="right", fill="y")
hsb.pack(side="bottom", fill="x")
# ── Zone log texte (optionnel, compact) ───────────────────────────────
tk.Label(tab, text="LOG BRUT",
bg=BG, fg="#6c7086", font=("Consolas",9)).pack(anchor="w", padx=12, pady=(4,0))
self.report_txt = scrolledtext.ScrolledText(
tab, height=6, bg="#11111b", fg=FG,
font=("Consolas",8), insertbackground=FG, state="disabled")
self.report_txt.pack(fill="x", padx=10, pady=(0,6))
for tag, color in [("ok",self.GREEN),("ko",self.RED),
("warn",self.YELLOW),("info",ACCENT),
("title","#cba6f7")]:
self.report_txt.tag_configure(tag, foreground=color,
font=("Consolas",9,"bold") if tag=="title" else ("Consolas",8))
# ==========================================================================
# HELPERS LOG
# ==========================================================================
def _log(self, widget, msg, tag="info"):
def _do():
widget.configure(state="normal")
ts = datetime.now().strftime("%H:%M:%S")
widget.insert("end", f"[{ts}] {msg}\n", tag)
widget.see("end")
widget.configure(state="disabled")
self.root.after(0, _do)
def _log_patch(self, msg, tag="info"):
self._log(self.patch_log, msg, tag)
def _log_post(self, msg, tag="info"):
self._log(self.post_log, msg, tag)
# ==========================================================================
# PREREQUIS — SSH + Disque + Satellite + Snapshot
# ==========================================================================
def _prompt_password(self, title, label):
"""Prompt mot de passe centre sur le parent. Retourne le mdp ou None."""
dlg = tk.Toplevel(self.root)
dlg.title(title)
dlg.configure(bg="#1e1e2e")
dlg.resizable(False, False)
result = [None]
tk.Label(dlg, text=label, bg="#1e1e2e", fg="#cdd6f4", font=("Consolas", 11)).pack(padx=20, pady=(15, 5))
pwd_var = tk.StringVar()
e = tk.Entry(dlg, textvariable=pwd_var, show="*", bg="#2a2a3e", fg="#cdd6f4",
font=("Consolas", 11), insertbackground="#cdd6f4", width=30)
e.pack(padx=20, pady=5)
e.focus_set()
def _ok(event=None):
result[0] = pwd_var.get().strip()
dlg.destroy()
e.bind("<Return>", _ok)
tk.Button(dlg, text="OK", bg="#40a02b", fg="white", font=("Consolas", 10, "bold"),
padx=20, command=_ok).pack(pady=10)
center_window(dlg, 400, 140, parent=self.root)
dlg.grab_set()
dlg.wait_window()
return result[0]
def _ensure_psmp_pwd(self):
"""S'assure que le mot de passe PSMP est disponible (session). Retourne True si OK."""
if self.cyb_pwd:
return True
pwd = self.cybpwd_var.get().strip() if hasattr(self, 'cybpwd_var') else ""
if pwd:
self.cyb_pwd = pwd
return True
pwd = self._prompt_password("CyberArk PSMP",
f"Mot de passe CyberArk ({self.settings.get('cybr_user', 'CYBP01336')}) :")
if pwd:
self.cyb_pwd = pwd
if hasattr(self, 'cybpwd_var'):
self.cybpwd_var.set(pwd)
return True
return False
def _ensure_vcenter_pwd(self):
"""S'assure que les credentials vCenter sont disponibles (session). Retourne (user, pwd) ou None."""
vs_user = self.settings.get("vs_user", "").strip()
vs_pwd = self.settings.get("_vs_pwd_session", "").strip()
if vs_user and vs_pwd:
return vs_user, vs_pwd
if not vs_user:
vs_user = self._prompt_password("vCenter", "Utilisateur vCenter :")
if not vs_user:
return None
self.settings["vs_user"] = vs_user
if not vs_pwd:
vs_pwd = self._prompt_password("vCenter", f"Mot de passe vCenter ({vs_user}) :")
if not vs_pwd:
return None
self.settings["_vs_pwd_session"] = vs_pwd
return vs_user, vs_pwd
def _check_prerequisites(self):
selected = [s for s in self.servers if s.get("selected") and s["accord"] == "oui"]
if not selected:
messagebox.showwarning("Prerequis", "Aucun serveur selectionne avec accord=OUI")
return
self._reload_keys()
# PSMP si serveurs prod
has_prod = any("prod" in s.get("env", "").lower() for s in selected)
if has_prod:
if not self._ensure_psmp_pwd():
messagebox.showwarning("Prerequis", "Mot de passe PSMP requis pour les serveurs Production")
return
self.prereq_btn.configure(state="disabled", text="Verification en cours...")
self.prereq_status.configure(text="")
self.audit("PREREQ_START", f"{len(selected)} serveurs (prod={has_prod})")
self._log_patch(f"Prerequis : {len(selected)} serveurs a verifier...", "info")
def worker():
results = {}
total = len(selected)
for idx, s in enumerate(selected):
server = s["server"]
env = s.get("env", "")
r = {"ssh": False, "disk_root": None, "disk_log": None, "satellite": None, "is_vm": server.lower().startswith("v")}
self.root.after(0, lambda sv=server, i=idx: self.prereq_status.configure(
text=f"[{i+1}/{total}] {sv}"))
self._log_patch(f"\n[{idx+1}/{total}] {server} ({env})", "info")
# 1. Check SSH
self._log_patch(f" SSH...", "info")
is_prod = "prod" in env.lower()
method = "PSMP" if is_prod else "cle SSH"
client, eff = ssh_connect(server, env, self.settings,
self.pkey, self.pkey2, self.cyb_pwd)
if not client:
r["ssh"] = False
r["error"] = "Connexion SSH impossible"
self._log_patch(f" SSH ({method}) ECHEC — Authentification KO", "ko")
# Si PSMP echoue, invalider le mot de passe pour reproposer
if is_prod and method == "PSMP":
self._log_patch(f" Verifier le mot de passe CyberArk ou le compte {self.settings.get('cybr_user','')}", "ko")
results[server] = r
continue
r["ssh"] = True
self._log_patch(f" SSH ({method}) Authentification OK : {eff}", "ok")
# 2. Check disque /
self._log_patch(f" Disque / ...", "info")
out, _ = run_cmd(client, "df -BM / | awk 'NR==2{print $4}' | tr -d 'M'", 10)
try:
r["disk_root"] = int(out.strip())
except (ValueError, AttributeError):
r["disk_root"] = -1
if r["disk_root"] >= 1200:
self._log_patch(f" Disque / : {r['disk_root']} Mo libre (OK >= 1200)", "ok")
else:
self._log_patch(f" Disque / : {r['disk_root']} Mo libre (KO < 1200)", "ko")
# Check disque /var/log
self._log_patch(f" Disque /var/log ...", "info")
out2, _ = run_cmd(client, "df -BM /var/log | awk 'NR==2{print $4}' | tr -d 'M'", 10)
try:
r["disk_log"] = int(out2.strip())
except (ValueError, AttributeError):
r["disk_log"] = -1
if r["disk_log"] >= 500:
self._log_patch(f" Disque /var/log : {r['disk_log']} Mo libre (OK >= 500)", "ok")
else:
self._log_patch(f" Disque /var/log : {r['disk_log']} Mo libre (KO < 500)", "ko")
# 3. Check Satellite
self._log_patch(f" Satellite ...", "info")
sat_out, _ = run_cmd(client, "sudo subscription-manager status 2>/dev/null | head -10", 15)
sat_low = sat_out.lower() if sat_out else ""
if "current" in sat_low or "valide" in sat_low or "disabled" in sat_low or "content access" in sat_low:
r["satellite"] = True
mode = "SCA" if "content access" in sat_low or "disabled" in sat_low else "classique"
self._log_patch(f" Satellite : OK ({mode})", "ok")
elif "unknown" in sat_low or "not registered" in sat_low:
r["satellite"] = False
self._log_patch(f" Satellite : NON enregistre", "ko")
elif sat_out and ("status" in sat_low or "system" in sat_low):
r["satellite"] = True
self._log_patch(f" Satellite : OK (reponse detectee)", "ok")
else:
r["satellite"] = None
self._log_patch(f" Satellite : N/A (subscription-manager absent)", "warn")
client.close()
results[server] = r
# Afficher les resultats dans l'UI
self.root.after(0, lambda: self._show_prereq_results(results, selected))
threading.Thread(target=worker, daemon=True).start()
def _show_prereq_results(self, results, selected):
self.prereq_btn.configure(state="normal", text="Verifier prerequis")
self._log_patch(f"\n{'='*50}", "info")
self._log_patch(f"RESUME PREREQUIS", "info")
self._log_patch(f"{'='*50}", "info")
# Construire le rapport
ok_servers = []
ko_servers = []
warn_servers = []
for s in selected:
server = s["server"]
r = results.get(server, {})
if not r.get("ssh"):
ko_servers.append((server, "SSH connexion impossible"))
s["prereq_status"] = "SSH_KO"
elif r.get("disk_root", 0) >= 0 and r["disk_root"] < 1200:
ko_servers.append((server, f"/ : {r['disk_root']} Mo libre (min 1200 Mo)"))
s["prereq_status"] = "DISK_KO"
elif r.get("disk_log", 0) >= 0 and r["disk_log"] < 500:
ko_servers.append((server, f"/var/log : {r['disk_log']} Mo libre (min 500 Mo)"))
s["prereq_status"] = "DISK_KO"
elif r.get("satellite") is False:
ko_servers.append((server, "Satellite : non enregistre"))
s["prereq_status"] = "SAT_KO"
else:
ok_servers.append(server)
s["prereq_status"] = "OK"
if r.get("satellite") is None:
warn_servers.append((server, "subscription-manager absent (Debian/physique ?)"))
# Log resume
for srv in ok_servers:
self._log_patch(f" OK : {srv}", "ok")
for srv, reason in warn_servers:
self._log_patch(f" WARN : {srv}{reason}", "warn")
for srv, reason in ko_servers:
self._log_patch(f" KO : {srv}{reason}", "ko")
self._log_patch(f"\nTotal : {len(ok_servers)} OK, {len(warn_servers)} WARN, {len(ko_servers)} KO", "info")
# Dialogue resultat
msg = f"Prerequis verifies : {len(selected)} serveurs\n\n"
msg += f"OK : {len(ok_servers)}\n"
if warn_servers:
msg += f"Avertissements : {len(warn_servers)}\n"
if ko_servers:
msg += f"BLOQUANTS : {len(ko_servers)}\n\n"
msg += "Serveurs bloques :\n"
for srv, reason in ko_servers:
msg += f" {srv} : {reason}\n"
msg += "\nDeselectionnez les serveurs bloques pour continuer."
if ko_servers:
# Deselectionner automatiquement les KO
for srv, _ in ko_servers:
for s in self.servers:
if s["server"] == srv:
s["selected"] = False
self._apply_filters()
messagebox.showwarning("Prerequis — problemes detectes", msg)
self.prereq_status.configure(text=f"{len(ko_servers)} serveurs bloques", fg=self.RED)
else:
messagebox.showinfo("Prerequis OK", msg)
self.prereq_status.configure(text=f"{len(ok_servers)} serveurs OK", fg=self.GREEN)
# Activer le bouton snapshot pour les VM
vm_count = sum(1 for s in selected if s.get("prereq_status") == "OK"
and s["server"].lower().startswith("v"))
if vm_count > 0:
self.snap_btn.configure(state="normal", bg="#e64553", fg="white")
self.audit("PREREQ_DONE", f"OK={len(ok_servers)} KO={len(ko_servers)} WARN={len(warn_servers)}")
# ==========================================================================
# SNAPSHOT — force sans snap
# ==========================================================================
def _force_patch_without_snap(self, server_name):
"""Demande une justification pour patcher une VM sans snapshot"""
dlg = tk.Toplevel(self.root)
dlg.title("Patching sans snapshot")
dlg.configure(bg="#1e1e2e")
dlg.resizable(False, False)
result = [None]
tk.Label(dlg, text=f"ATTENTION — VM sans snapshot", bg="#181825", fg="#f38ba8",
font=("Consolas", 12, "bold"), pady=8).pack(fill="x")
tk.Label(dlg, text=f"Serveur : {server_name}", bg="#1e1e2e", fg="#cdd6f4",
font=("Consolas", 11)).pack(pady=(10, 5))
tk.Label(dlg, text="Justification obligatoire :", bg="#1e1e2e", fg="#f9e2af",
font=("Consolas", 10)).pack(anchor="w", padx=20)
justif_var = tk.StringVar()
tk.Entry(dlg, textvariable=justif_var, bg="#2a2a3e", fg="#cdd6f4",
font=("Consolas", 10), insertbackground="#cdd6f4", width=50).pack(padx=20, pady=5)
def accept():
j = justif_var.get().strip()
if len(j) < 5:
messagebox.showwarning("Justification", "Minimum 5 caracteres")
return
result[0] = j
dlg.destroy()
bf = tk.Frame(dlg, bg="#1e1e2e")
bf.pack(pady=10)
tk.Button(bf, text="Forcer le patching", bg="#d20f39", fg="white",
font=("Consolas", 10, "bold"), command=accept).pack(side="left", padx=8)
tk.Button(bf, text="Annuler", bg="#313244", fg="#cdd6f4",
font=("Consolas", 10), command=dlg.destroy).pack(side="left", padx=8)
center_window(dlg, 500, 220, parent=self.root)
dlg.grab_set()
dlg.wait_window()
if result[0]:
self.audit("FORCE_PATCH_NO_SNAP", f"{server_name} : {result[0]}")
return result[0]
# ==========================================================================
# A PROPOS
# ==========================================================================
def _show_about(self):
dlg = tk.Toplevel(self.root)
dlg.title("A propos")
dlg.configure(bg="#1e1e2e")
dlg.resizable(False, False)
tk.Frame(dlg, bg="#e94560", height=3).pack(fill="x")
tk.Label(dlg, text="SANEF Linux Patch Manager",
bg="#1e1e2e", fg="#89b4fa", font=("Consolas", 16, "bold")).pack(pady=(20, 4))
tk.Label(dlg, text=f"Version {VERSION}",
bg="#1e1e2e", fg="#cdd6f4", font=("Consolas", 12)).pack()
tk.Frame(dlg, bg="#313244", height=1).pack(fill="x", padx=30, pady=12)
tk.Label(dlg, text="SANEF DSI — Securite Operationnelle",
bg="#1e1e2e", fg="#cdd6f4", font=("Consolas", 11)).pack()
tk.Label(dlg, text="(KM)",
bg="#1e1e2e", fg="#a6e3a1", font=("Consolas", 11)).pack(pady=(4, 0))
tk.Label(dlg, text="SECOPS 2026",
bg="#1e1e2e", fg="#6c7086", font=("Consolas", 10)).pack(pady=(2, 0))
tk.Frame(dlg, bg="#313244", height=1).pack(fill="x", padx=30, pady=12)
tk.Label(dlg, text="Orchestration du patching Linux\nvia Excel + SSH + vSphere",
bg="#1e1e2e", fg="#6c7086", font=("Consolas", 9), justify="center").pack()
tk.Button(dlg, text="Fermer", bg="#313244", fg="#cdd6f4",
font=("Consolas", 10), padx=20, pady=4,
command=dlg.destroy).pack(pady=16)
center_window(dlg, 380, 320, parent=self.root)
dlg.grab_set()
# ==========================================================================
# SETTINGS
# ==========================================================================
def _open_settings(self):
dlg = SettingsDialog(self.root, self.settings)
if dlg.result:
self.settings = dlg.result
self._reload_keys()
def _reload_keys(self):
self.pkey = load_key(self.settings.get("keyfile",""))
self.pkey2 = load_key(self.settings.get("keyfile2",""))
# ==========================================================================
# ONGLET 1 — LOGIQUE
# ==========================================================================
def _browse_excel(self):
path = filedialog.askopenfilename(
filetypes=[("Excel","*.xlsx *.xls"),("All","*.*")])
if path:
self.excel_var.set(path)
self.settings["excel_file"] = path
save_settings(self.settings)
self._load_excel()
def _prev_week(self):
sheets = list(self.sheet_cb["values"])
if not sheets: return
cur = self.sheet_var.get()
idx = sheets.index(cur) if cur in sheets else 0
if idx > 0:
self.sheet_var.set(sheets[idx-1])
self._load_sheet()
def _next_week(self):
sheets = list(self.sheet_cb["values"])
if not sheets: return
cur = self.sheet_var.get()
idx = sheets.index(cur) if cur in sheets else 0
if idx < len(sheets)-1:
self.sheet_var.set(sheets[idx+1])
self._load_sheet()
def _goto_current_week(self):
week_num = date.today().isocalendar()[1]
sheets = list(self.sheet_cb["values"])
for s in [f"S{week_num}", f"S{week_num:02d}"]:
if s in sheets:
self.sheet_var.set(s)
self._load_sheet()
return
def _load_excel(self):
path = self.excel_var.get().strip()
if not path or not os.path.exists(path):
messagebox.showerror("Erreur", f"Fichier introuvable :\n{path}")
return
self.audit("LOAD_EXCEL", path)
try:
import io
try:
with open(path, "rb") as f:
data = io.BytesIO(f.read())
wb = openpyxl.load_workbook(data, read_only=True, data_only=True)
except Exception:
wb = openpyxl.load_workbook(path, read_only=True, data_only=True)
sheets = wb.sheetnames
wb.close()
# Mettre à jour le combobox feuilles
self.sheet_cb["values"] = sheets
week_sheet = None
week_num = date.today().isocalendar()[1]
for s in sheets:
if str(week_num) in s:
week_sheet = s
break
self.sheet_var.set(week_sheet or (sheets[0] if sheets else ""))
self._load_sheet()
except Exception as e:
messagebox.showerror("Erreur Excel", f"Impossible de modifier l'Excel :\n{e}")
def _load_sheet(self):
path = self.excel_var.get().strip()
sheet = self.sheet_var.get()
if not path or not sheet:
return
servers, _ = read_excel_servers(path, sheet)
if servers is None:
messagebox.showerror("Erreur", "Aucun serveur Linux trouvé")
return
# Détecter si c'est la semaine courante
week_num = date.today().isocalendar()[1]
self.is_current_week = any(str(week_num) in sheet for sheet in [sheet])
# Mettre à jour l'indicateur visuel
if self.is_current_week:
self.week_mode_lbl.configure(
text="✅ SEMAINE COURANTE — Patch autorisé",
fg=self.GREEN)
else:
self.week_mode_lbl.configure(
text=f"👁 VUE SEULE ({sheet}) — Patch désactivé",
fg=self.YELLOW)
self.servers = servers
# Peupler les filtres
envs = sorted(set(s["env"] for s in servers if s["env"]))
domains = sorted(set(s["domain"] for s in servers if s["domain"]))
apps = sorted(set(s["app"] for s in servers if s["app"]))
patchers = sorted(set(s["patcher"].strip() for s in servers if s.get("patcher") and s["patcher"].strip()))
self.filter_env_cb["values"] = ["Tous"] + envs
self.filter_domain_cb["values"] = ["Tous"] + domains
self.filter_app_cb["values"] = ["Tous"] + apps
if self.filter_patcher_cb:
self.filter_patcher_cb["values"] = ["Tous"] + patchers
self.filter_env_var.set("Tous")
self.filter_domain_var.set("Tous")
self.filter_app_var.set("Tous")
# Auto-selectionner le patcheur
if self.current_user["role"] == "admin":
patcher_setting = self.settings.get("patcher", "").strip()
if patcher_setting and patcher_setting in patchers:
self.filter_patcher_var.set(patcher_setting)
else:
self.filter_patcher_var.set("Tous")
else:
self.filter_patcher_var.set("Tous") # Non-admin filtre par _apply_filters
self._apply_filters()
# Griser/activer le bouton GO selon la semaine
if hasattr(self, "go_btn"):
if self.is_current_week:
self.go_btn.configure(bg="#d20f39", state="normal",
text="🚀 GO PATCH")
else:
self.go_btn.configure(bg="#45475a", state="normal",
text="👁 VUE SEULE — Patch désactivé")
# Vérifier snapshots si des serveurs ont accord=oui
accord_servers = [s for s in self.servers
if s.get("accord") == "oui" and not s.get("is_physical")]
if accord_servers and VSPHERE_OK:
self.root.after(200, self._prompt_vcenter_and_check_snaps)
def _apply_filters(self):
env_f = self.filter_env_var.get()
domain_f = self.filter_domain_var.get()
app_f = self.filter_app_var.get()
patcher_f = self.filter_patcher_var.get()
today_f = self.filter_today_var.get()
today = date.today()
# Non-admin : ne voit que ses serveurs (intervenant = son username)
is_admin = self.current_user["role"] == "admin"
my_name = self.current_user["username"].lower()
filtered = []
for s in self.servers:
# Filtre par utilisateur connecte (non-admin)
if not is_admin:
srv_patcher = (s.get("patcher", "") or "").strip().lower()
if srv_patcher != my_name:
continue
if patcher_f != "Tous":
srv_patcher = (s.get("patcher", "") or "").strip()
patcher_cmp = patcher_f.strip()
if srv_patcher and srv_patcher.lower() != patcher_cmp.lower():
continue
if env_f != "Tous" and s["env"] != env_f: continue
if domain_f != "Tous" and s["domain"] != domain_f: continue
if app_f != "Tous" and s["app"] != app_f: continue
if today_f and s["date_patch"] and s["date_patch"] != today: continue
filtered.append(s)
self._populate_tree(filtered)
def _populate_tree(self, servers):
for item in self.srv_tree.get_children():
self.srv_tree.delete(item)
# Compteur pour gérer les doublons de nom de serveur dans la feuille
seen = {}
for s in servers:
accord_ok = s["accord"] == "oui"
sel_mark = "" if s.get("selected") and accord_ok else ("" if accord_ok else "")
accord_lbl= "✅ OUI" if accord_ok else "❌ NON"
snap_lbl = "✅ OK" if s.get("snap_done") else ("🖥 PHY" if s.get("is_physical") else "")
date_str = s["date_patch"].strftime("%d/%m/%Y") if s.get("date_patch") else ""
if s.get("already_patched"):
tag = "already_patched"
elif not accord_ok:
tag = "no_accord"
elif s.get("selected"):
tag = "selected"
else:
tag = "normal"
# Générer un iid unique si le nom de serveur apparaît en double
base_iid = s["server"]
seen[base_iid] = seen.get(base_iid, 0) + 1
iid = base_iid if seen[base_iid] == 1 else f"{base_iid}_#{seen[base_iid]}"
s["_iid"] = iid # stocker pour _refresh_tree_item
self.srv_tree.insert("", "end",
iid=iid,
values=(sel_mark, accord_lbl, s["server"],
s.get("patcher",""), s["env"],
s["domain"], s["app"], date_str, snap_lbl, s["status"]),
tags=(tag,))
n = sum(1 for s in servers if s.get("selected") and s["accord"]=="oui")
self.count_lbl.configure(text=f"{len(servers)} serveurs affichés — {n} sélectionnés")
def _get_server(self, name):
for s in self.servers:
if s["server"] == name:
return s
return None
def _get_server_by_iid(self, iid):
"""Retrouver un serveur par son iid treeview (gère les doublons de noms)."""
for s in self.servers:
if s.get("_iid") == iid:
return s
# Fallback : chercher par nom (compatibilité)
return self._get_server(iid)
def _toggle_srv(self, event):
region = self.srv_tree.identify_region(event.x, event.y)
if region != "cell": return
item = self.srv_tree.identify_row(event.y)
if not item: return
# Retrouver le serveur par son iid stocké (gère les doublons)
s = self._get_server_by_iid(item)
if not s or s["accord"] != "oui": return # accord obligatoire
s["selected"] = not s.get("selected", False)
self._refresh_tree_item(s)
n = sum(1 for x in self.servers if x.get("selected") and x["accord"]=="oui")
self.count_lbl.configure(text=f"{n} sélectionnés")
def _refresh_tree_item(self, s):
# Chercher l'iid : d'abord _iid stocké, sinon nom serveur, sinon chercher dans l'arbre
iid = s.get("_iid", s["server"])
if not self.srv_tree.exists(iid):
# Fallback : chercher parmi tous les items de l'arbre par valeur "server"
for item in self.srv_tree.get_children():
vals = self.srv_tree.item(item, "values")
if vals and len(vals) > 2 and vals[2] == s["server"]:
iid = item
s["_iid"] = iid # mémoriser pour la prochaine fois
break
else:
return # serveur pas dans l'arbre (filtré)
accord_ok = s["accord"] == "oui"
sel_mark = "" if s.get("selected") else ""
snap_lbl = "✅ OK" if s.get("snap_done") else ("🖥 PHY" if s.get("is_physical") else "")
date_str = s["date_patch"].strftime("%d/%m/%Y") if s.get("date_patch") else ""
tag = "selected" if s.get("selected") else "normal"
self.srv_tree.item(iid, values=(
sel_mark, "✅ OUI" if accord_ok else "❌ NON",
s["server"], s.get("patcher",""), s["env"], s["domain"], s["app"],
date_str, snap_lbl, s["status"]), tags=(tag,))
def _select_all_ok(self):
for s in self.servers:
if s["accord"] == "oui" and self.srv_tree.exists(s["server"]):
s["selected"] = True
self._refresh_tree_item(s)
n = sum(1 for s in self.servers if s.get("selected"))
self.count_lbl.configure(text=f"{n} sélectionnés")
def _select_none(self):
for s in self.servers:
s["selected"] = False
if self.srv_tree.exists(s["server"]):
self._refresh_tree_item(s)
self.count_lbl.configure(text="— 0 sélectionnés")
def _invert(self):
for s in self.servers:
if s["accord"] == "oui" and self.srv_tree.exists(s["server"]):
s["selected"] = not s.get("selected", False)
self._refresh_tree_item(s)
n = sum(1 for s in self.servers if s.get("selected"))
self.count_lbl.configure(text=f"{n} sélectionnés")
# ==========================================================================
# SNAPSHOT VSPHERE
# ==========================================================================
def _prompt_vcenter_and_check_snaps(self):
"""Verifie les snapshots — prompt vCenter si pas encore saisi dans la session."""
accord_servers = [s for s in self.servers
if s.get("selected") and s.get("accord") == "oui" and not s.get("is_physical")]
if not accord_servers:
return
creds = self._ensure_vcenter_pwd()
if not creds:
self._log_patch("Verification snapshot ignoree (pas de credentials vCenter)", "warn")
return
vs_user, vs_pwd = creds
self._log_patch(f"Verification snapshots ({len(accord_servers)} serveurs)...", "info")
threading.Thread(target=self._check_snap_worker,
args=(accord_servers, vs_user, vs_pwd), daemon=True).start()
def _get_snap_identifier(self):
"""Retourne le nom/identifiant pour le snapshot : patcheur ou username"""
patcher = self.settings.get("patcher", "").strip()
if patcher:
return patcher
return self.current_user.get("username", "secops")
def _check_snap_worker(self, servers, vs_user, vs_pwd):
"""Thread : v\u00e9rifie l'existence d'un snapshot r\u00e9cent du patcheur sur les vCenters."""
snap_id = self._get_snap_identifier()
vc_errors = []
from datetime import timezone, timedelta
def has_recent_snap(vm):
"""Retourne True si la VM a un snapshot du patcheur de moins d'1 heure"""
if not vm.snapshot or not vm.snapshot.rootSnapshotList:
return False
now = datetime.now(timezone.utc)
def walk(snap_list):
for snap in snap_list:
# Snap du patcheur (nom contient l'identifiant)
if snap.name.startswith("SLPM_") and snap_id.lower() in snap.name.lower():
# Verifier age < 1 heure
try:
snap_time = snap.createTime
if snap_time.tzinfo is None:
snap_time = snap_time.replace(tzinfo=timezone.utc)
age = (now - snap_time).total_seconds()
if age < 3600:
return True
except Exception:
return True # En cas de doute, considerer comme existant
if snap.childSnapshotList and walk(snap.childSnapshotList):
return True
return False
return walk(vm.snapshot.rootSnapshotList)
# Connexion a tous les vCenters une seule fois
import ssl
vc_connections = {} # vc_host -> (si, content)
for vc in VSPHERE_HOSTS:
try:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
si = SmartConnect(host=vc, user=vs_user, pwd=vs_pwd, sslContext=ctx)
vc_connections[vc] = (si, si.RetrieveContent())
self._log_patch(f" vCenter {vc} : connecte", "ok")
except Exception as e:
err_str = str(e)
if "10060" in err_str or "timed out" in err_str.lower():
self._log_patch(f" vCenter {vc} : INJOIGNABLE (timeout)", "ko")
elif "10061" in err_str or "refused" in err_str.lower():
self._log_patch(f" vCenter {vc} : CONNEXION REFUSEE", "ko")
elif "auth" in err_str.lower() or "login" in err_str.lower() or "401" in err_str:
self._log_patch(f" vCenter {vc} : AUTHENTIFICATION ECHOUEE", "ko")
else:
self._log_patch(f" vCenter {vc} : ERREUR — {err_str[:80]}", "ko")
vc_errors.append(vc)
# Pour chaque serveur, chercher dans l'ordre prioritaire de ses vCenters
for s in servers:
if s.get("snap_done"):
continue
vc_order = get_vcenter_order_for_server(s["server"])
for vc in vc_order:
if vc not in vc_connections:
continue
_, content = vc_connections[vc]
vm = self._find_vm(content, s["server"])
if not vm:
continue
if has_recent_snap(vm):
s["snap_done"] = True
self._log_patch(f" ✅ Snapshot existant : {s['server']} ({vc.split('.')[0]})", "ok")
self.root.after(0, lambda sv=s: self._refresh_tree_item(sv))
else:
self._log_patch(f" ⚠ Pas de snapshot : {s['server']} ({vc.split('.')[0]})", "warn")
break # VM trouvee sur ce vCenter, pas besoin de chercher les autres
# Deconnexion
for vc, (si, _) in vc_connections.items():
try: Disconnect(si)
except: pass
# Bilan
ok = sum(1 for s in servers if s.get("snap_done"))
ko = len(servers) - ok
not_found = [s["server"] for s in servers if not s.get("snap_done")]
if vc_errors:
self._log_patch(f"vCenter injoignables : {', '.join(vc_errors)} — les VMs de ces vCenter n'ont pas pu etre verifiees", "ko")
if not_found:
self._log_patch(f"VMs sans snapshot : {', '.join(not_found[:10])}", "warn")
self._log_patch(
f"Bilan snapshots : {ok} OK, {ko} manquant(s) sur {len(servers)} serveur(s)",
"ok" if ko == 0 else "warn")
def _center_dialog(self, dlg, w, h):
"""Centre un dialog sur le meme ecran que la fenetre principale."""
center_window(dlg, w, h, parent=self.root)
def _take_snapshots(self):
selected = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
if not selected:
messagebox.showwarning("Attention", "Aucun serveur sélectionné (avec accord).")
return
if not VSPHERE_OK:
messagebox.showwarning("pyVmomi manquant",
"pyVmomi non installé.\n\n"
"Installer avec :\npy -m pip install pyVmomi --proxy http://proxy.sanef.fr:8080\n\n"
"Après installation, relancer l'application.")
return
creds = self._ensure_vcenter_pwd()
if not creds:
self._log_patch("Snapshot annule (pas de credentials vCenter)", "warn")
return
vs_user, vs_pwd = creds
self._log_patch(f"Creation snapshots ({len(selected)} serveurs)...", "info")
threading.Thread(target=self._snap_worker,
args=(selected, vs_user, vs_pwd), daemon=True).start()
def _snap_worker(self, servers, vs_user, vs_pwd):
snap_id = self._get_snap_identifier()
snap_name = f"SLPM_{snap_id}_{datetime.now():%Y%m%d_%H%M}"
from datetime import timezone as _tz
def has_recent_snap(vm):
if not vm.snapshot or not vm.snapshot.rootSnapshotList:
return False
now = datetime.now(_tz.utc)
def walk(snap_list):
for snap in snap_list:
if snap.name.startswith("SLPM_") and snap_id.lower() in snap.name.lower():
try:
st = snap.createTime
if st.tzinfo is None: st = st.replace(tzinfo=_tz.utc)
if (now - st).total_seconds() < 3600: return True
except Exception: return True
if snap.childSnapshotList and walk(snap.childSnapshotList): return True
return False
return walk(vm.snapshot.rootSnapshotList)
# Connexion a tous les vCenters une seule fois
import ssl
vc_connections = {}
for vc in VSPHERE_HOSTS:
self._log_patch(f"Connexion vCenter : {vc}", "info")
try:
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
si = SmartConnect(host=vc, user=vs_user, pwd=vs_pwd, sslContext=ctx)
vc_connections[vc] = (si, si.RetrieveContent())
self._log_patch(f" vCenter {vc} : connecte", "ok")
except Exception as e:
err_str = str(e)
if "10060" in err_str or "timed out" in err_str.lower():
self._log_patch(f" vCenter {vc} : INJOIGNABLE (timeout)", "ko")
elif "10061" in err_str or "refused" in err_str.lower():
self._log_patch(f" vCenter {vc} : CONNEXION REFUSEE", "ko")
elif "auth" in err_str.lower() or "login" in err_str.lower():
self._log_patch(f" vCenter {vc} : AUTHENTIFICATION ECHOUEE", "ko")
else:
self._log_patch(f" vCenter {vc} : ERREUR — {err_str[:80]}", "ko")
# Pour chaque serveur, chercher/creer le snapshot dans l'ordre prioritaire
for s in servers:
if s.get("snap_done"):
continue
vc_order = get_vcenter_order_for_server(s["server"])
for vc in vc_order:
if vc not in vc_connections:
continue
_, content = vc_connections[vc]
vm = self._find_vm(content, s["server"])
if not vm:
continue
# VM trouvee sur ce vCenter
if has_recent_snap(vm):
s["snap_done"] = True
self._log_patch(f" Snapshot recent existant : {s['server']} ({vc.split('.')[0]})", "ok")
self.root.after(0, lambda sv=s: self._refresh_tree_item(sv))
else:
try:
task = vm.CreateSnapshot_Task(
name=snap_name,
description=f"Pre-patch automatique {date.today()}",
memory=False, quiesce=False)
while task.info.state not in ("success","error"):
time.sleep(1)
if task.info.state == "success":
s["snap_done"] = True
self._log_patch(f" Snapshot OK : {s['server']} ({vc.split('.')[0]})", "ok")
self.audit("SNAP_CREATE", f"{s['server']} | {snap_name} | OK")
self.root.after(0, lambda sv=s: self._refresh_tree_item(sv))
else:
self._log_patch(f" Snapshot KO : {s['server']} ({task.info.error})", "ko")
self.audit("SNAP_CREATE", f"{s['server']} | {snap_name} | ECHEC")
except Exception as e:
self._log_patch(f" {s['server']} : {e}", "ko")
break # VM trouvee, pas besoin de chercher les autres vCenters
# Deconnexion
for vc, (si, _) in vc_connections.items():
try: Disconnect(si)
except: pass
# Bilan final
ok = [s for s in servers if s.get("snap_done")]
ko = [s for s in servers if not s.get("snap_done") and not s.get("is_physical")]
for s in ko:
self._log_patch(f" VM non trouvee sur aucun vCenter : {s['server']}", "warn")
self._log_patch(
f"Snapshots terminés : {len(ok)}/{len(servers)} OK", "ok" if not ko else "warn")
# Rafraîchir l'arbre complet à la fin et basculer vers l'onglet Patch
def _finish_snap():
for sv in servers:
self._refresh_tree_item(sv)
self.nb.select(self.tab_patch)
self.root.after(0, _finish_snap)
def _find_vm(self, content, name):
"""Cherche une VM par nom exact ou partiel dans tout le vCenter"""
name_short = name.split(".")[0].lower() # Sans le domaine
def search_folder(folder):
try:
for obj in folder.childEntity:
if hasattr(obj, "childEntity"):
result = search_folder(obj)
if result:
return result
if hasattr(obj, "name"):
vm_name = obj.name.lower()
if vm_name == name.lower() or vm_name == name_short:
return obj
if name_short in vm_name or vm_name in name_short:
return obj
except Exception:
pass
return None
try:
for dc in content.rootFolder.childEntity:
result = search_folder(dc.vmFolder)
if result:
return result
except Exception:
pass
return None
# ==========================================================================
# ONGLET 2 — LOGIQUE PATCH
# ==========================================================================
def _on_preset(self, event=None):
val = PRESET_PACKAGES.get(self.preset_var.get(), "")
if val == "CUSTOM":
# Mode personnalise — activer les champs
self.packages_entry.configure(state="normal")
self.custom_excludes_entry.configure(state="normal")
self.packages_var.set("")
self.custom_excludes_var.set("")
else:
self.packages_var.set(val)
self.custom_excludes_var.set("")
self._preview_cmd()
def _preview_cmd(self):
"""Genere la commande yum et l'affiche dans le champ editable"""
preset = self.preset_var.get()
packages = self.packages_var.get().strip()
custom_excl = self.custom_excludes_var.get().strip()
exclude_kernel = self.exclude_kernel_var.get()
dryrun = self.dryrun_var.get()
if preset == "Personnalise..." or custom_excl:
# Mode personnalise
if packages and not custom_excl:
# Packages specifiques a patcher uniquement
if dryrun:
cmd = f"sudo yum check-update {packages} 2>/dev/null | head -30"
else:
cmd = f"sudo yum update -y {packages} 2>&1 | tail -25"
elif custom_excl:
# Excludes personnalises
excl_parts = " ".join(f"--exclude={e.strip()}" for e in custom_excl.split() if e.strip())
if exclude_kernel and "--exclude=kernel" not in excl_parts.lower():
excl_parts += " --exclude=*kernel*"
if packages:
if dryrun:
cmd = f"sudo yum check-update {packages} {excl_parts} 2>/dev/null | head -30"
else:
cmd = f"sudo yum update -y {packages} {excl_parts} 2>&1 | tail -25"
else:
if dryrun:
cmd = f"sudo yum check-update {excl_parts} 2>/dev/null | head -30"
else:
cmd = f"sudo yum update -y {excl_parts} 2>&1 | tail -25"
else:
cmd = "sudo yum check-update 2>/dev/null | head -30" if dryrun else "sudo yum update -y 2>&1 | tail -25"
else:
# Mode preset standard — utilise build_yum_command
cmd = build_yum_command("standard", packages, exclude_kernel, dryrun)
cmd = "sudo " + cmd if not cmd.startswith("sudo") else cmd
self.final_cmd_var.set(cmd)
def _go_patch(self):
# Bloquer si pas la semaine courante
if not self.is_current_week:
messagebox.showwarning(
"Patch desactive",
"Vous consultez une autre semaine que la semaine courante.\n\n"
"Le patch n'est autorise que sur la semaine en cours.\n"
"Utilisez le bouton ↺ pour revenir a la semaine courante.")
return
selected = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
if not selected:
messagebox.showwarning("Attention", "Aucun serveur sélectionné avec accord.")
return
# Vérifier snapshot pour les VMs
no_snap = [s for s in selected if not s.get("snap_done") and not s.get("is_physical")]
if no_snap:
names = "\n".join(s["server"] for s in no_snap[:8])
ans = messagebox.askyesno(
"⚠️ Snapshot manquant",
f"{len(no_snap)} serveur(s) sans snapshot :\n{names}\n\n"
"Continuer SANS snapshot ?",
icon="warning")
if not ans:
return
if not self.dryrun_var.get():
if not messagebox.askyesno(
"Confirmation PATCH RÉEL",
f"Lancer le PATCH RÉEL sur {len(selected)} serveur(s) ?\n\n"
"Cette action est irréversible !",
icon="warning"):
return
self.cyb_pwd = self.cybpwd_var.get().strip() or None
self.running = True
self.apply_all_cmd = False
# Reprise apres STOP : garder les statuts et results
# Nouveau lancement : tout remettre a zero
if getattr(self, "_stopped", False):
self._stopped = False
else:
self.results = []
for s in selected:
if s.get("status") not in ("✅ DÉJÀ PATCHÉ",):
s["status"] = ""
s["patch_status"] = ""
s["patch_detail"] = ""
s["reboot_required"] = False
s.pop("_teams_debut_sent", None)
s.pop("_teams_fin_sent", None)
s.pop("dep_excluded", None)
self.go_btn.configure(state="disabled")
self.stop_btn.configure(state="normal")
threading.Thread(
target=self._patch_worker,
args=(selected,),
daemon=True).start()
def _stop(self):
self.running = False
self._stopped = True # Distinguer STOP (reprise) vs fin normale
self._log_patch("⏹ Arrêt demandé...", "warn")
# Envoyer annulation Teams pour les serveurs en cours (debut envoyé mais pas fin)
for s in self.servers:
if s.get("_teams_debut_sent") and not s.get("_teams_fin_sent"):
self._send_teams_notification(s, "annulation")
s["_teams_fin_sent"] = True # Marquer comme terminé pour éviter doublon
@staticmethod
def _rpm_name(full_nevra):
"""Extrait le nom du paquet depuis un NEVRA complet.
Ex: runc-1.1.12-4.el8.x86_64 → runc
container-selinux-2:2.229.0-2.el8.noarch → container-selinux"""
# Retirer l'arch (.x86_64, .noarch, .i686)
for arch in (".x86_64", ".noarch", ".i686", ".aarch64", ".i386", ".src"):
if full_nevra.endswith(arch):
full_nevra = full_nevra[:-len(arch)]
break
# Retirer version-release : derniers 2 segments apres "-"
parts = full_nevra.rsplit("-", 2)
if len(parts) == 3 and (any(c.isdigit() for c in parts[1]) or ":" in parts[1]):
return parts[0]
if len(parts) >= 2 and any(c.isdigit() for c in parts[-1]):
return "-".join(parts[:-1])
return full_nevra
def _detect_dep_errors(self, yum_output):
"""Analyse la sortie yum pour detecter les erreurs de dependances.
Retourne (has_error, dep_packages_to_exclude, error_detail)"""
import re
has_error = False
pkgs_to_exclude = set()
details = []
lines = yum_output.splitlines()
in_error_block = False
for i, line in enumerate(lines):
low = line.lower().strip()
# Detecter le bloc d'erreur
if low.startswith("error:") or low.startswith("problem:"):
has_error = True
in_error_block = True
if not in_error_block:
continue
# "package xxx is filtered out by exclude filtering"
# → le paquet exclu empeche une dependance
if "is filtered out by exclude" in low or "is excluded" in low:
m = re.search(r'package\s+(\S+)', line, re.I)
if m:
details.append(line.strip())
# "package xxx requires yyy, but none of the providers can be installed"
# → xxx ne peut pas etre installe, il faut l'exclure
if "requires" in low and ("none of the providers" in low or "excluded" in low or "filtered" in low):
m = re.search(r'package[:\s]+(\S+)', line, re.I)
if m:
pkg = self._rpm_name(m.group(1))
pkgs_to_exclude.add(pkg)
details.append(f"{pkg} : dependance bloquee")
# "nothing provides xxx needed by yyy"
# → yyy ne peut pas etre installe
if "nothing provides" in low and "needed by" in low:
m = re.search(r'needed by\s+(\S+)', line, re.I)
if m:
pkg = self._rpm_name(m.group(1))
pkgs_to_exclude.add(pkg)
details.append(f"{pkg} : dependance manquante")
# "Problem: package xxx-version requires yyy"
if low.startswith("problem:") or low.startswith("- package"):
m = re.search(r'package\s+(\S+)\s+requires', line, re.I)
if m:
pkg = self._rpm_name(m.group(1))
pkgs_to_exclude.add(pkg)
# "(try to add '--skip-broken' to skip uninstallable packages)"
if "skip-broken" in low:
has_error = True
# Bloc vide = fin du bloc erreur
if low == "" and in_error_block:
in_error_block = False
return has_error, list(pkgs_to_exclude), details
def _patch_worker(self, servers):
total = len(servers)
packages = self.packages_var.get().strip()
exclude_kernel = self.exclude_kernel_var.get()
dryrun = self.dryrun_var.get()
approved_cmd = None # commande approuvée pour "appliquer à tous"
for idx, s in enumerate(servers):
if not self.running:
break
server = s["server"]
# Skip serveurs deja traites (reprise apres STOP)
if s.get("status") in ("OK", "IGNORE"):
self._log_patch(f"\n[{idx+1}/{total}] ── {server} ── (deja traite, skip)", "info")
self._update_progress(idx+1, total)
continue
self._log_patch(f"\n[{idx+1}/{total}] ── {server} ──", "info")
# Construire la commande yum
yum_cmd = build_yum_command(s["domain"], packages, exclude_kernel, dryrun, server)
# Confirmation commande (sauf si "appliquer à tous" déjà validé)
if not self.apply_all_cmd:
event = threading.Event()
dialog_result = [None]
def show_dialog(cmd=yum_cmd, srv=server, ev=event, res=dialog_result):
dlg = YumConfirmDialog(self.root, srv, cmd)
res[0] = dlg.result
if dlg.result in ("apply","apply_all"):
yum_cmd_final = dlg.final_cmd
res.append(yum_cmd_final)
ev.set()
self.root.after(0, show_dialog)
event.wait()
action = dialog_result[0]
if action == "cancel":
self._log_patch(f" ↩ Serveur ignoré par l'utilisateur", "warn")
# Si debut Teams deja envoye (reprise), envoyer annulation
if s.get("_teams_debut_sent") and not s.get("_teams_fin_sent"):
self._send_teams_notification(s, "annulation")
s["_teams_fin_sent"] = True
s["status"] = "IGNORE"
self._update_tree_status(s)
continue
elif action == "apply_all":
self.apply_all_cmd = True
if len(dialog_result) > 1:
yum_cmd = dialog_result[1]
approved_cmd = yum_cmd
elif action == "apply":
if len(dialog_result) > 1:
yum_cmd = dialog_result[1]
else:
if approved_cmd:
yum_cmd = approved_cmd
self._log_patch(f" Connexion...", "info")
t0 = time.time()
client, eff_host = ssh_connect(
server, s["env"], self.settings,
self.pkey, self.pkey2, self.cyb_pwd)
if client is None:
dur = round(time.time() - t0, 1)
self._log_patch(f" ❌ Connexion impossible ({dur}s)", "ko")
# Si debut Teams envoye (reprise apres crash), envoyer annulation
if s.get("_teams_debut_sent") and not s.get("_teams_fin_sent"):
self._send_teams_notification(s, "annulation")
s["_teams_fin_sent"] = True
s["status"] = "AUTH_KO"
self.results.append({**s, "duration": f"{dur}s",
"patch_status":"KO","patch_detail":"Connexion impossible"})
self._update_tree_status(s)
self._update_progress(idx+1, total)
continue
self._log_patch(f" ✅ Connecté : {eff_host}", "ok")
# Détection physique
s["is_physical"] = detect_physical(client)
if s["is_physical"]:
self._log_patch(f" 🖥 Serveur physique détecté", "warn")
# Pré-patching
self._log_patch(f" Pre-patching...", "info")
pre_out, _ = run_cmd(client, PRE_PATCH_SCRIPT, 30)
if "PRE_PATCH_OK" in pre_out:
self._log_patch(f" Pre-patch OK", "ok")
else:
self._log_patch(f" Pre-patch incomplet", "warn")
# Flux Libre Podman — snapshot pods (pas les BST)
is_fl = "flux libre" in s.get("domain", "").lower() and _is_fl_podman(server)
if is_fl:
self._log_patch(f" Flux Libre : snapshot pods Podman...", "info")
fl_out, _ = run_cmd(client, FL_PRE_PATCH_SCRIPT, 30)
if "FL_PRE_OK" in fl_out:
for line in fl_out.splitlines():
if line.startswith("FL_USER=") or line.startswith("FL_PODS_SAVED="):
self._log_patch(f" {line}", "ok")
elif "FL_NO_PODS" in fl_out:
self._log_patch(f" Pas de pods Podman detectes", "info")
else:
self._log_patch(f" Snapshot pods incomplet", "warn")
# Snapshot versions avant
ver_before = {}
if packages:
for pkg in packages.split():
out, _ = run_cmd(client, f"rpm -q {pkg} 2>/dev/null || echo NOT_INSTALLED", 15)
ver_before[pkg] = out.strip()
# Detecter le gestionnaire de paquets distant (yum/dnf vs apt)
pkg_mgr_out, _ = run_cmd(client, "command -v apt 2>/dev/null && echo APT || (command -v dnf 2>/dev/null && echo DNF || echo YUM)", 5)
is_apt = "APT" in pkg_mgr_out
# Si apt detecte et commande yum → adapter automatiquement
if is_apt and ("yum " in yum_cmd or "dnf " in yum_cmd):
self._log_patch(f" Systeme Debian/Ubuntu detecte → adaptation apt", "info")
if dryrun:
yum_cmd = "sudo apt update 2>&1 && apt list --upgradable 2>/dev/null"
else:
yum_cmd = "sudo apt update 2>&1 && sudo DEBIAN_FRONTEND=noninteractive apt upgrade -y 2>&1"
elif is_apt and "apt " in yum_cmd:
# Commande apt personnalisee — juste ajouter DEBIAN_FRONTEND et nettoyer les pipes yum
if "DEBIAN_FRONTEND" not in yum_cmd:
yum_cmd = yum_cmd.replace("sudo apt", "sudo DEBIAN_FRONTEND=noninteractive apt")
# Supprimer les filtres grep yum qui cassent la sortie apt
if "| grep" in yum_cmd:
yum_cmd = yum_cmd.split("2>&1")[0] + "2>&1" if "2>&1" in yum_cmd else yum_cmd.split("2>/dev/null")[0] + "2>&1"
# Notification Teams : debut d'intervention (une seule fois par serveur)
if not s.get("_teams_debut_sent"):
self._send_teams_notification(s, "debut")
s["_teams_debut_sent"] = True
self._log_patch(f" Commande : {yum_cmd[:120]}...", "cmd")
self.audit("PATCH_CMD", f"{server} | {yum_cmd[:120]}")
# Callback pour affichage temps reel
def _on_yum_line(line, _s=server):
self._log_patch(f" {line}", "info")
yum_out, yum_err = run_cmd_stream(client, yum_cmd, 300, on_line=_on_yum_line)
yum_full = yum_out + "\n" + yum_err
# ── Gestion erreurs yum (retry intelligent) ──
extra_excludes = ""
all_excluded_pkgs = []
gpg_retried = False
if not dryrun:
for retry in range(4):
yum_low = yum_full.lower()
# 1) Erreur GPG → retry avec --nogpgcheck
if not gpg_retried and ("gpg check failed" in yum_low or
"gpg keys" in yum_low and "incorrect" in yum_low):
gpg_retried = True
self._log_patch(f" ⚠ Erreur GPG detectee → retry avec --nogpgcheck", "warn")
yum_cmd_gpg = yum_cmd.replace("yum update -y", "yum update -y --nogpgcheck")
self._log_patch(f" Commande : {yum_cmd_gpg[:120]}...", "cmd")
yum_out, yum_err = run_cmd_stream(client, yum_cmd_gpg, 300, on_line=_on_yum_line)
yum_full = yum_out + "\n" + yum_err
yum_cmd = yum_cmd_gpg
continue
# 2) Erreur dependances → detecter paquets bloques et les exclure
has_dep_err, dep_pkgs, dep_details = self._detect_dep_errors(yum_full)
if not has_dep_err or not dep_pkgs:
break
# Eviter boucle infinie (memes paquets)
new_pkgs = [p for p in dep_pkgs if p not in all_excluded_pkgs]
if not new_pkgs:
break
all_excluded_pkgs.extend(new_pkgs)
for d in dep_details:
self._log_patch(f"{d}", "warn")
new_excludes = " ".join(f"--exclude=*{p}*" for p in new_pkgs)
extra_excludes += " " + new_excludes
self._log_patch(f" ↻ Paquets bloques exclus : {', '.join(new_pkgs)}", "warn")
yum_cmd = build_yum_command(s["domain"], packages, exclude_kernel,
dryrun, server, extra_excludes)
if gpg_retried:
yum_cmd = yum_cmd.replace("yum update -y", "yum update -y --nogpgcheck")
self._log_patch(f" Commande : {yum_cmd[:120]}...", "cmd")
yum_out, yum_err = run_cmd_stream(client, yum_cmd, 300, on_line=_on_yum_line)
yum_full = yum_out + "\n" + yum_err
if all_excluded_pkgs:
self._log_patch(f" Paquets exclus (dependances) : {', '.join(set(all_excluded_pkgs))}", "warn")
s["dep_excluded"] = list(set(all_excluded_pkgs))
if gpg_retried:
self._log_patch(f" --nogpgcheck utilise", "warn")
# Snapshot versions après
ver_after = {}
if packages:
for pkg in packages.split():
out, _ = run_cmd(client, f"rpm -q {pkg} 2>/dev/null || echo NOT_INSTALLED", 15)
ver_after[pkg] = out.strip()
dur = round(time.time() - t0, 1)
# Detecter erreur residuelle (apres retries)
final_err, final_dep_pkgs, _ = self._detect_dep_errors(yum_full)
has_yum_error = final_err and final_dep_pkgs
# Erreurs fatales non recuperables
yum_low = yum_full.lower()
if "gpg check failed" in yum_low and not gpg_retried:
has_yum_error = True
self._log_patch(f" ❌ Erreur GPG non resolue", "ko")
if "timeout" in yum_low and ("mirror" in yum_low or "metadata" in yum_low):
has_yum_error = True
self._log_patch(f" ❌ Timeout repo/mirror", "ko")
if "cannot prepare internal mirrorlist" in yum_low:
has_yum_error = True
self._log_patch(f" ❌ Erreur mirrorlist", "ko")
# Déterminer statut
if dryrun:
pkg_lines = [l for l in yum_out.splitlines()
if l.split() and "." in l.split()[0]
and not any(x in l for x in ["kB","bps","00:","Updat"])]
if pkg_lines:
s["patch_status"] = "UPDATE_AVAIL"
s["patch_detail"] = " | ".join(pkg_lines)[:200]
self._log_patch(f" ⚠ Updates disponibles : {len(pkg_lines)} package(s)", "warn")
else:
s["patch_status"] = "UP_TO_DATE"
s["patch_detail"] = "Deja a jour"
self._log_patch(f" ✅ A jour", "ok")
elif has_yum_error and "Complete!" not in yum_out:
# Erreur yum non resolue
excluded_str = ", ".join(s.get("dep_excluded", [])) or ", ".join(final_dep_pkgs)
s["patch_status"] = "DEP_ERROR"
s["patch_detail"] = f"Erreur dependances : {excluded_str}"[:200]
self._log_patch(f" ❌ ERREUR dependances : {excluded_str}", "ko")
self.audit("PATCH_ERROR", f"{server} | DEP_ERROR | {excluded_str[:100]}")
else:
updated = [p for p in (packages.split() if packages else [])
if ver_before.get(p) != ver_after.get(p)]
# Compter les paquets mis a jour (RHEL + Debian)
import re
pkg_count = 0
pkg_names = []
for line in yum_full.splitlines():
# RHEL: "Updated:" ou "Upgraded:" ou "Installed:" dans le résumé
# Ligne de paquet yum: " openssl.x86_64 1:1.1.1k-14.el8_10"
m_yum = re.match(r'^\s{2,}(\S+\.\S+)\s+\S+', line)
if m_yum and not any(x in line for x in ["kB","bps","00:","http","Updat","Load"]):
pkg_names.append(m_yum.group(1).split(".")[0])
# Debian: "XX mis à jour, XX nouvellement installés"
m_apt = re.search(r'(\d+)\s+mis\s+[àa]\s+jour', line)
if m_apt:
pkg_count = max(pkg_count, int(m_apt.group(1)))
# Debian EN: "XX upgraded"
m_apt_en = re.search(r'(\d+)\s+upgraded', line)
if m_apt_en:
pkg_count = max(pkg_count, int(m_apt_en.group(1)))
# Debian: lignes "Inst paquet version"
m_inst = re.match(r'^Inst\s+(\S+)', line)
if m_inst:
pkg_names.append(m_inst.group(1))
# Debian: "Préparation du dépaquetage de .../paquet_version"
m_deb = re.match(r'^D[eé]paquetage de\s+(\S+)', line)
if m_deb:
pkg_names.append(m_deb.group(1))
# Detecter si patché
is_patched = (
updated
or "Complete!" in yum_out # RHEL yum
or "Complet" in yum_out # RHEL dnf FR
or pkg_count > 0 # Debian "N mis à jour"
or len(pkg_names) > 0 # Paquets detectes
)
# Detecter si rien a faire
is_uptodate = (
"Nothing to do" in yum_out
or "No packages" in yum_out
or "0 mis à jour" in yum_full
or "0 upgraded" in yum_full
or "Rien à faire" in yum_full
)
if is_patched and not is_uptodate:
s["patch_status"] = "PATCHED"
if updated:
detail = " | ".join(
f"{p}: {ver_before.get(p,'?')} -> {ver_after.get(p,'?')}"
for p in updated)[:200]
elif pkg_names:
unique_pkgs = list(dict.fromkeys(pkg_names)) # dedup preserving order
n = len(unique_pkgs)
detail = f"{n} paquet(s) : {', '.join(unique_pkgs[:10])}"
if n > 10:
detail += f" ... +{n-10}"
elif pkg_count > 0:
detail = f"{pkg_count} paquet(s) mis a jour"
else:
detail = "Packages mis a jour"
if s.get("dep_excluded"):
detail += f" (exclus: {', '.join(s['dep_excluded'])})"
s["patch_detail"] = detail[:200]
self._log_patch(f" ✅ PATCHE : {s['patch_detail'][:120]}", "ok")
self.audit("PATCH_DONE", f"{server} | PATCHE | {s['patch_detail'][:100]}")
elif is_uptodate:
s["patch_status"] = "UP_TO_DATE"
s["patch_detail"] = "Deja a jour"
self._log_patch(f" ✅ Deja a jour", "ok")
self.audit("PATCH_DONE", f"{server} | DEJA A JOUR")
else:
s["patch_status"] = "UP_TO_DATE"
s["patch_detail"] = "Deja a jour"
# Vérifier reboot nécessaire (RHEL: needs-restarting, Debian: /var/run/reboot-required)
reboot_out, _ = run_cmd(client,
"if command -v needs-restarting &>/dev/null; then "
"needs-restarting -r 2>/dev/null; echo RC:$?; "
"elif [ -f /var/run/reboot-required ]; then echo RC:1; "
"else echo RC:0; fi", 15)
if "RC:1" in reboot_out:
s["reboot_required"] = True
self._log_patch(f" ⚠ REBOOT REQUIS", "warn")
else:
s["reboot_required"] = False
s["status"] = "OK"
s["duration"] = f"{dur}s"
# Notification Teams : fin d'intervention (apres patching)
if not s.get("_teams_fin_sent"):
self._send_teams_notification(s, "fin")
s["_teams_fin_sent"] = True
client.close()
self.results.append(deepcopy(s))
self._update_tree_status(s)
self._update_progress(idx+1, total)
# Fin
self.running = False
self.root.after(0, lambda: [
self.go_btn.configure(state="normal"),
self.stop_btn.configure(state="disabled"),
self._patch_done_summary()
])
def _update_tree_status(self, s):
def _do():
if not self.srv_tree.exists(s["server"]): return
vals = list(self.srv_tree.item(s["server"], "values"))
vals[9] = s.get("status", "") # statut est maintenant col 9
tag = ("ok" if s.get("status")=="OK" and s.get("patch_status") not in ("KO",)
else "ko" if s.get("status")=="AUTH_KO" else "normal")
self.srv_tree.item(s["server"], values=vals, tags=(tag,))
self.root.after(0, _do)
def _update_progress(self, done, total):
pct = (done/total)*100
self.root.after(0, lambda: [
self.prog_var.set(pct),
self.prog_lbl.configure(text=f"{done}/{total} ({pct:.0f}%)")])
def _patch_done_summary(self):
ok_c = sum(1 for r in self.results if r.get("status")=="OK")
p_c = sum(1 for r in self.results if r.get("patch_status")=="PATCHED")
rb_c = sum(1 for r in self.results if r.get("reboot_required"))
ko_c = sum(1 for r in self.results if r.get("status")!="OK")
msg = (f"Patch termine !\n\n"
f"Total traite : {len(self.results)}\n"
f"OK : {ok_c}\n"
f"Patche : {p_c}\n"
f"Reboot requis: {rb_c}\n"
f"KO/Auth : {ko_c}")
messagebox.showinfo("Patch termine", msg)
if rb_c > 0:
messagebox.showwarning("Reboot requis",
f"{rb_c} serveur(s) necessitent un reboot.\n"
"Utiliser l'onglet Post-patching > Reboot.")
# Auto-générer le rapport dans l'onglet 4
self.root.after(500, self._generate_report)
# Basculer sur l'onglet rapport
self.root.after(600, lambda: self.nb.select(self.tab_report))
# ==========================================================================
# ONGLET 3 — POST-PATCHING
# ==========================================================================
def _post_patch_check(self):
targets = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
if not targets:
messagebox.showwarning("Attention", "Aucun serveur sélectionné.")
return
threading.Thread(target=self._post_worker, args=(targets,), daemon=True).start()
def _post_worker(self, servers):
for s in servers:
self._log_post(f"\n── {s['server']} ──", "info")
client, eff = ssh_connect(s["server"], s["env"], self.settings,
self.pkey, self.pkey2, self.cyb_pwd)
if not client:
self._log_post(f" Connexion impossible", "ko")
continue
# Verifier que le pre-patch a ete execute aujourd'hui
check, _ = run_cmd(client, "find /tmp -name 'secops_services_avant_*' -mtime 0 2>/dev/null | head -1", 10)
if not check or not check.strip():
self._log_post(f" BLOQUE : pas de pre-patch aujourd'hui sur ce serveur", "ko")
self._log_post(f" Le fichier /tmp/secops_services_avant_* est absent ou trop ancien", "ko")
self._log_post(f" Lancez d'abord un patching (le pre-patch s'execute automatiquement)", "warn")
client.close()
continue
self._log_post(f" Pre-patch trouve : {check.strip()}", "ok")
self.audit("POST_CHECK", f"{s['server']} | verification post-patching")
out, _ = run_cmd(client, POST_PATCH_SCRIPT, 60)
for line in out.splitlines():
if "ALERTE" in line or "disparu" in line.lower():
self._log_post(f" {line}", "ko")
elif "WARN" in line or "nouveau" in line.lower():
self._log_post(f" {line}", "warn")
elif "OK" in line:
self._log_post(f" {line}", "ok")
else:
self._log_post(f" {line}", "info")
# Flux Libre Podman — verifier et redemarrer les pods (pas les BST)
is_fl = "flux libre" in s.get("domain", "").lower() and _is_fl_podman(s["server"])
if is_fl:
self._log_post(f" Flux Libre : verification pods Podman...", "info")
fl_out, _ = run_cmd(client, FL_POST_PATCH_SCRIPT, 60)
for line in fl_out.splitlines():
if "FL_RESTART=" in line:
pod = line.split("=", 1)[1]
self._log_post(f" Redemarrage pod: {pod}", "warn")
elif "FL_ALL_OK" in line:
self._log_post(f" Tous les pods sont actifs", "ok")
elif "FL_RESTARTED=" in line:
n = line.split("=", 1)[1]
self._log_post(f" {n} pod(s) redemarres", "warn")
elif "FL_USER=" in line:
self._log_post(f" {line}", "info")
elif "FL_NO_PODS" in line:
self._log_post(f" Pas de pods Podman", "info")
# Notification Teams : fin d'intervention (une seule fois par serveur)
if not s.get("_teams_fin_sent"):
self._send_teams_notification(s, "fin")
s["_teams_fin_sent"] = True
client.close()
def _ask_mark_patched(self):
"""Demande confirmation puis marque les serveurs patchés en vert dans Excel"""
patched = [s for s in self.servers
if s.get("patch_status") == "PATCHED" and s.get("selected")]
if not patched:
patched = [r for r in self.results if r.get("patch_status") == "PATCHED"]
if not patched:
messagebox.showwarning("Attention",
"Aucun serveur patche detecte.\n"
"Selectionner manuellement les serveurs a marquer.")
patched = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
if not patched:
return
names = "\n".join(s["server"] for s in patched[:10])
if len(patched) > 10:
names += f"\n... et {len(patched)-10} autres"
if messagebox.askyesno(
"Marquer comme patches dans Excel",
f"Marquer {len(patched)} serveur(s) en VERT dans le fichier Excel ?\n\n{names}\n\nCette action modifie le fichier Excel original.",
icon="question"):
self._mark_patched_excel(patched)
def _delete_old_snapshots(self):
"""Supprime les snapshots SLPM du patcheur courant > 3 jours"""
if not VSPHERE_OK:
messagebox.showwarning("pyVmomi manquant",
"pyVmomi non installe.\npy -m pip install pyVmomi --proxy http://proxy.sanef.fr:8080")
return
selected = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
if not selected:
messagebox.showwarning("Attention", "Aucun serveur selectionne.")
return
snap_id = self._get_snap_identifier()
if not messagebox.askyesno("Supprimer snapshots",
f"Supprimer vos snapshots SLPM de plus de 3 jours ?\n\n"
f"Patcheur : {snap_id}\n"
f"Serveurs : {len(selected)}\n\n"
f"Seuls les snapshots SLPM_{snap_id}_* seront supprimes.\n"
f"Les snapshots des autres patcheurs et les snapshots manuels ne seront pas touches."):
return
creds = self._ensure_vcenter_pwd()
if not creds:
self._log_post("Suppression annulee (pas de credentials vCenter)", "warn")
return
vs_user, vs_pwd = creds
self._log_post(f"Suppression snapshots SLPM_{snap_id}_* > 3 jours...", "info")
threading.Thread(target=self._snap_delete_worker,
args=(selected, vs_user, vs_pwd, 3), daemon=True).start()
def _snap_delete_worker(self, servers, vs_user, vs_pwd, max_days):
"""Supprime les snapshots SLPM du patcheur courant vieux de PLUS de max_days jours"""
snap_id = self._get_snap_identifier()
from datetime import timezone
cutoff = datetime.now(tz=timezone.utc) - __import__("datetime").timedelta(days=max_days)
connected_vcs = {}
for vc in VSPHERE_HOSTS:
try:
import ssl
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
si = SmartConnect(host=vc, user=vs_user, pwd=vs_pwd,
sslContext=ctx, connectionPoolTimeout=10)
vc_content = si.RetrieveContent()
self._log_post(f"Connecte vCenter : {vc}", "ok")
connected_vcs[vc] = (si, vc_content)
except Exception as e:
self._log_post(f"Inaccessible : {vc}{str(e)[:50]}", "ko")
if not connected_vcs:
self._log_post("Aucun vCenter joignable", "ko")
return
for s in servers:
vc_order = get_vcenter_order_for_server(s["server"])
for vc in vc_order:
if vc not in connected_vcs:
continue
si, vc_content = connected_vcs[vc]
vm = self._find_vm(vc_content, s["server"])
if not vm:
continue
self._log_post(f"\n VM : {vm.name} sur {vc}", "info")
def get_snapshots(snap_list):
snaps = []
for snap in snap_list:
snaps.append(snap)
snaps.extend(get_snapshots(snap.childSnapshotList))
return snaps
if not vm.snapshot:
self._log_post(f" Aucun snapshot", "info")
continue
all_snaps = get_snapshots(vm.snapshot.rootSnapshotList)
for snap in all_snaps:
# Seuls les snapshots SLPM du patcheur courant
if not snap.name.startswith("SLPM_"):
self._log_post(f" {snap.name} — ignore (pas SLPM)", "info")
continue
if snap_id.lower() not in snap.name.lower():
self._log_post(f" {snap.name} — ignore (autre patcheur)", "info")
continue
snap_time = snap.createTime
if snap_time.tzinfo is None:
snap_time = snap_time.replace(tzinfo=timezone.utc)
age_days = (datetime.now(tz=timezone.utc) - snap_time).days
if age_days > max_days:
self._log_post(
f" {snap.name} ({age_days}j) — SUPPRESSION (>{max_days}j)", "warn")
try:
task = snap.snapshot.RemoveSnapshot_Task(removeChildren=False)
while task.info.state not in ("success","error"):
time.sleep(1)
if task.info.state == "success":
self._log_post(f" OK supprime : {snap.name}", "ok")
self.audit("SNAP_DELETE", f"{s['server']} | {snap.name} | {age_days}j")
else:
self._log_post(f" ERREUR suppression : {snap.name}", "ko")
except Exception as e:
self._log_post(f" ERREUR : {e}", "ko")
else:
# Récent (≤ max_days jours) → conserver
self._log_post(
f" Snapshot : {snap.name} ({age_days}j) — conservé (<= {max_days}j)", "info")
break
for vc, (si, _) in connected_vcs.items():
try: Disconnect(si)
except: pass
def _remove_old_kernels(self):
targets = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
if not targets:
messagebox.showwarning("Attention", "Aucun serveur selectionne.")
return
if not messagebox.askyesno("Confirmation",
f"Supprimer les anciens kernels sur {len(targets)} serveur(s) ?\n\n"
f"Garde uniquement le kernel actif + le dernier installe."):
return
def worker():
for s in targets:
self._log_post(f"\n{'='*50}", "info")
self._log_post(f"{s['server']} — Suppression anciens kernels", "info")
self._log_post(f"{'='*50}", "info")
client, _ = ssh_connect(s["server"], s["env"], self.settings,
self.pkey, self.pkey2, self.cyb_pwd)
if not client:
self._log_post(f" Connexion impossible", "ko")
continue
# 1. Lister les kernels installes
self._log_post(f" Kernels installes :", "info")
out, _ = run_cmd(client, "rpm -q kernel kernel-core 2>/dev/null | sort", 15)
if out:
for line in out.splitlines():
self._log_post(f" {line}", "info")
else:
self._log_post(f" Aucun kernel trouve", "warn")
# 2. Kernel actif
active, _ = run_cmd(client, "uname -r", 10)
self._log_post(f" Kernel actif : {active}", "ok")
# 3. Compter les kernels
count, _ = run_cmd(client, "rpm -q kernel kernel-core 2>/dev/null | wc -l", 10)
nb = int(count.strip()) if count.strip().isdigit() else 0
self._log_post(f" Nombre de kernels : {nb}", "info")
if nb <= 2:
self._log_post(f" Rien a supprimer (2 kernels max)", "ok")
client.close()
continue
# 4. Supprimer les anciens (garder 2 : actif + dernier)
self._log_post(f" Suppression (garde 2)...", "warn")
# RHEL 8+ : dnf
cmd = (
"if command -v dnf &>/dev/null; then "
" sudo dnf remove --oldinstallonly --setopt installonly_limit=2 -y 2>&1; "
"else "
" sudo package-cleanup --oldkernels --count=2 -y 2>&1; "
"fi"
)
out, _ = run_cmd(client, cmd, 120)
if out:
for line in out.splitlines()[-10:]:
if "removed" in line.lower() or "erasing" in line.lower():
self._log_post(f" {line}", "ok")
elif "error" in line.lower():
self._log_post(f" {line}", "ko")
elif "nothing" in line.lower() or "no packages" in line.lower():
self._log_post(f" {line}", "info")
else:
self._log_post(f" {line}", "info")
# 5. Verification
after, _ = run_cmd(client, "rpm -q kernel kernel-core 2>/dev/null | sort", 10)
self._log_post(f" Kernels restants :", "info")
if after:
for line in after.splitlines():
self._log_post(f" {line}", "ok")
client.close()
threading.Thread(target=worker, daemon=True).start()
def _undo_yum(self):
target = None
for s in self.servers:
if s.get("selected") and s["accord"]=="oui":
target = s
break
if not target:
messagebox.showwarning("Attention", "Sélectionner au moins un serveur.")
return
self._log_post(f"\n── {target['server']} — Historique yum ──", "info")
def worker():
client, _ = ssh_connect(target["server"], target["env"], self.settings,
self.pkey, self.pkey2, self.cyb_pwd)
if not client:
self._log_post(" ❌ Connexion impossible", "ko")
return
out, _ = run_cmd(client, "sudo yum history list 2>/dev/null | head -20", 20)
client.close()
self._log_post(out, "info")
# Demander l'ID à annuler
def ask():
tid = simpledialog.askstring("Undo yum",
f"Historique yum sur {target['server']} :\n\n{out}\n\nID de transaction à annuler :")
if tid:
threading.Thread(target=do_undo, args=(tid,), daemon=True).start()
def do_undo(tid):
self.audit("UNDO_YUM", f"{target['server']} | transaction {tid}")
self._log_post(f" Undo transaction {tid}...", "warn")
c2, _ = ssh_connect(target["server"], target["env"], self.settings,
self.pkey, self.pkey2, self.cyb_pwd)
if not c2:
self._log_post(" ❌ Connexion impossible", "ko")
return
out2, _ = run_cmd(c2, f"sudo yum history undo {tid} -y 2>&1 | tail -15", 120)
for line in out2.splitlines():
self._log_post(f" {line}", "ok" if "Complete" in line else "info")
c2.close()
self.root.after(0, ask)
threading.Thread(target=worker, daemon=True).start()
def _reboot_servers(self):
# audit moved to per-server reboot
reboot_list = [s for s in self.servers
if s.get("selected") and s.get("reboot_required") and s["accord"]=="oui"]
if not reboot_list:
reboot_list = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
if not reboot_list:
messagebox.showwarning("Attention", "Aucun serveur sélectionné.")
return
names = "\n".join(s["server"] for s in reboot_list[:10])
if not messagebox.askyesno("⚠️ Confirmation reboot",
f"Redémarrer {len(reboot_list)} serveur(s) ?\n\n{names}", icon="warning"):
return
def worker():
for s in reboot_list:
self._log_post(f"\n── {s['server']} — Reboot ──", "warn")
client, _ = ssh_connect(s["server"], s["env"], self.settings,
self.pkey, self.pkey2, self.cyb_pwd)
if not client:
self._log_post(" ❌ Connexion impossible", "ko"); continue
# Notification Teams AVANT le reboot (une seule fois par serveur)
if not s.get("_teams_reboot_sent"):
self._send_teams_notification(s, "reboot")
s["_teams_reboot_sent"] = True
self.audit("REBOOT", f"{s['server']} | reboot dans 15s")
self._log_post(f" Notification reboot envoyee", "info")
# Reboot dans 15 secondes (laisse le temps a la synchro SharePoint)
run_cmd(client, "sudo nohup bash -c 'sleep 15 && shutdown -r now' &>/dev/null &", 5)
client.close()
self._log_post(f" Reboot lance (dans 15s)", "ok")
threading.Thread(target=worker, daemon=True).start()
# ==========================================================================
# ONGLET 4 — RAPPORT
# ==========================================================================
def _generate_report(self):
"""Génère le rapport visuel — KPI widgets + tableau détaillé + log brut"""
if not self.results:
messagebox.showwarning("Attention", "Aucun résultat de patch disponible.")
return
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
week = date.today().isocalendar()[1]
# ── Calcul des KPI ────────────────────────────────────────────────────
total = len(self.results)
ok_c = sum(1 for r in self.results if r.get("status")=="OK")
p_c = sum(1 for r in self.results if r.get("patch_status")=="PATCHED")
uptodate = sum(1 for r in self.results if r.get("patch_status") in ("UP_TO_DATE","UP_TO_DATE"))
rb_c = sum(1 for r in self.results if r.get("reboot_required"))
ko_c = sum(1 for r in self.results if r.get("status")!="OK")
# Mettre à jour les widgets KPI
self.kpi_widgets["total" ].configure(text=str(total))
self.kpi_widgets["ok" ].configure(text=str(ok_c))
self.kpi_widgets["patched" ].configure(text=str(p_c))
self.kpi_widgets["uptodate"].configure(text=str(uptodate))
self.kpi_widgets["reboot" ].configure(text=str(rb_c))
self.kpi_widgets["ko" ].configure(text=str(ko_c))
# ── Peupler le tableau détaillé ───────────────────────────────────────
for item in self.report_tree.get_children():
self.report_tree.delete(item)
for r in self.results:
accord = "OUI" if r.get("accord")=="oui" else "NON"
snap = "OK" if r.get("snap_done") else ("PHY" if r.get("is_physical") else "")
conn = r.get("status","?")
ps = r.get("patch_status","?")
rb = "OUI" if r.get("reboot_required") else ""
detail = r.get("patch_detail","")[:60]
# Choisir le tag couleur
if r.get("status") != "OK":
tag = "ko"
elif r.get("patch_status") == "PATCHED":
tag = "patched"
elif r.get("reboot_required"):
tag = "reboot"
elif r.get("patch_status") in ("UP_TO_DATE",):
tag = "uptodate"
else:
tag = "ok"
self.report_tree.insert("", "end", values=(
r.get("server",""),
r.get("env",""),
r.get("domain",""),
accord,
snap,
conn,
ps,
rb,
detail,
), tags=(tag,))
# ── Log brut ──────────────────────────────────────────────────────────
self.report_txt.configure(state="normal")
self.report_txt.delete("1.0", "end")
def w(txt, tag="info"):
self.report_txt.insert("end", txt+"\n", tag)
w(f"{'='*70}", "title")
w(f" RAPPORT PATCHING — Semaine {week}{now}", "title")
w(f" Fichier : {self.excel_var.get()}", "info")
w(f" Intervenant : {self.settings.get('patcher','')}", "info")
w(f"{'='*70}", "title")
w("")
w(f" RESUME : {total} traites | {ok_c} OK | {p_c} patches | {rb_c} reboot | {ko_c} KO", "info")
w("")
w(f" {'SERVEUR':<35} {'STATUT':<12} {'PATCH':<16} {'REBOOT'}", "title")
w(f" {'-'*66}", "info")
for r in self.results:
status = r.get("status","?")
ps = r.get("patch_status","?")
rb = "REBOOT" if r.get("reboot_required") else ""
tag = "ok" if status=="OK" else "ko"
if r.get("patch_status") == "PATCHED": tag = "ok"
if r.get("reboot_required"): tag = "warn"
w(f" {r.get('server',''):<35} {status:<12} {ps:<16} {rb}", tag)
if r.get("patch_detail"):
w(f"{r['patch_detail'][:80]}", "info")
w("")
w(f"{'='*70}", "title")
self.report_txt.configure(state="disabled")
def _mark_patched_excel(self, servers_to_mark):
"""Met le fond vert sur les lignes patchees dans le fichier Excel.
Utilise une copie temporaire pour eviter les conflits OneDrive/verrou."""
import shutil
import tempfile
filepath = self.excel_var.get().strip()
sheet = self.sheet_var.get()
if not filepath or not os.path.exists(filepath):
messagebox.showerror("Erreur Excel", "Fichier Excel introuvable")
return
try:
from openpyxl import load_workbook
from openpyxl.styles import PatternFill
# Copie temporaire pour eviter le verrou OneDrive
tmp_dir = tempfile.mkdtemp()
tmp_path = os.path.join(tmp_dir, os.path.basename(filepath))
shutil.copy2(filepath, tmp_path)
wb = load_workbook(tmp_path)
ws = wb[sheet]
green_fill = PatternFill(start_color="00B050", end_color="00B050", fill_type="solid")
marked = 0
for s in servers_to_mark:
row_num = s.get("excel_row")
if not row_num:
for row in ws.iter_rows():
if row[0].value == s["server"]:
row_num = row[0].row
break
if row_num:
for col in range(1, ws.max_column + 1):
ws.cell(row=row_num, column=col).fill = green_fill
marked += 1
# Sauvegarder dans le temp
wb.save(tmp_path)
wb.close()
# Tenter de recopier vers l'original
try:
shutil.copy2(tmp_path, filepath)
self._log_patch(f" {marked} ligne(s) marquee(s) en vert dans Excel", "ok")
messagebox.showinfo("Excel mis a jour",
f"{marked} serveur(s) marques comme patches (fond vert) dans :\n{filepath}")
except PermissionError:
# OneDrive verrouille le fichier — sauvegarder a cote
backup_path = filepath.replace(".xlsx", f"_patched_{datetime.now():%Y%m%d_%H%M}.xlsx")
shutil.copy2(tmp_path, backup_path)
self._log_patch(f" Excel verrouille, sauvegarde dans : {backup_path}", "warn")
messagebox.showwarning("Excel verrouille",
f"Le fichier Excel est verrouille (OneDrive).\n\n"
f"Une copie a ete sauvegardee :\n{backup_path}\n\n"
f"Fermez le fichier original puis copiez la version patchee.")
# Nettoyage temp
try:
os.remove(tmp_path)
os.rmdir(tmp_dir)
except Exception:
pass
self.audit("MARK_EXCEL", f"{marked} serveurs marques, sheet={sheet}")
except Exception as e:
messagebox.showerror("Erreur Excel", f"Impossible de modifier l'Excel :\n{e}")
def _export_csv(self):
if not self.results:
messagebox.showwarning("Attention","Aucun résultat.")
return
rep_dir = os.path.join(os.path.expanduser("~"),"Documents","patch_reports")
os.makedirs(rep_dir, exist_ok=True)
csv_file = os.path.join(rep_dir, f"patch_S{date.today().isocalendar()[1]}_{datetime.now():%Y%m%d_%H%M%S}.csv")
fields = ["server","env","domain","app","accord","status","patch_status",
"patch_detail","reboot_required","snap_done","is_physical"]
with open(csv_file,"w",newline="",encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore")
w.writeheader(); w.writerows(self.results)
messagebox.showinfo("Export CSV", f"Fichier : {csv_file}")
def _export_dokuwiki(self):
if not self.results:
messagebox.showwarning("Attention","Aucun résultat.")
return
rep_dir = os.path.join(os.path.expanduser("~"),"Documents","patch_reports")
os.makedirs(rep_dir, exist_ok=True)
wiki_file = os.path.join(rep_dir, f"patch_wiki_S{date.today().isocalendar()[1]}_{datetime.now():%Y%m%d}.txt")
week = date.today().isocalendar()[1]
with open(wiki_file,"w",encoding="utf-8") as f:
f.write(f"====== Rapport Patching — Semaine {week} ======\n\n")
f.write(f"//Généré le {datetime.now():%d/%m/%Y %H:%M} — MYPCZEN / SANEF DSI-SOC//\n\n")
f.write("===== Résumé =====\n\n")
f.write(f"^ Serveurs traités ^ Patchés ^ Reboot requis ^ KO ^\n")
ok_c = sum(1 for r in self.results if r.get("status")=="OK")
p_c = sum(1 for r in self.results if r.get("patch_status")=="PATCHED")
rb_c = sum(1 for r in self.results if r.get("reboot_required"))
ko_c = sum(1 for r in self.results if r.get("status")!="OK")
f.write(f"| {len(self.results)} | {p_c} | {rb_c} | {ko_c} |\n\n")
f.write("===== Détail =====\n\n")
f.write("^ Serveur ^ Env ^ Statut ^ Patch ^ Reboot ^ Accord ^ Snapshot ^\n")
for r in self.results:
rb = "⚠ OUI" if r.get("reboot_required") else "NON"
f.write(f"| {r['server']} | {r['env']} | {r.get('status','?')} | "
f"{r.get('patch_status','?')} | {rb} | "
f"{'OUI' if r.get('accord')=='oui' else 'NON'} | "
f"{'OUI' if r.get('snap_done') else 'NON'} |\n")
messagebox.showinfo("Export DokuWiki", f"Fichier : {wiki_file}")
# ==========================================================================
# ONGLET 5 — AUDIT
# ==========================================================================
def _build_tab_audit(self):
tab = self.tab_audit
BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
hf = tk.Frame(tab, bg="#181825", pady=6)
hf.pack(fill="x")
tk.Label(hf, text="Journal d'audit", bg="#181825", fg=ACCENT,
font=("Consolas", 13, "bold")).pack(side="left", padx=12)
tk.Button(hf, text="Rafraichir", bg=BTN, fg=FG, font=("Consolas", 9),
command=self._refresh_audit).pack(side="right", padx=12)
tk.Button(hf, text="Exporter CSV", bg=BTN, fg=FG, font=("Consolas", 9),
command=self._export_audit_csv).pack(side="right", padx=4)
tk.Button(hf, text="Changer mon mot de passe", bg="#fab387", fg="#1e1e2e",
font=("Consolas", 9), command=self._change_my_password).pack(side="right", padx=4)
cols = ("timestamp", "user", "action", "details")
self.audit_tree = ttk.Treeview(tab, columns=cols, show="headings")
self.audit_tree.heading("timestamp", text="Date/Heure")
self.audit_tree.heading("user", text="Utilisateur")
self.audit_tree.heading("action", text="Action")
self.audit_tree.heading("details", text="Details")
self.audit_tree.column("timestamp", width=160)
self.audit_tree.column("user", width=120)
self.audit_tree.column("action", width=150)
self.audit_tree.column("details", width=500)
sb = ttk.Scrollbar(tab, orient="vertical", command=self.audit_tree.yview)
self.audit_tree.configure(yscrollcommand=sb.set)
self.audit_tree.pack(side="left", fill="both", expand=True, padx=6, pady=4)
sb.pack(side="right", fill="y", pady=4)
self._refresh_audit()
def _refresh_audit(self):
self.audit_tree.delete(*self.audit_tree.get_children())
for log in self.db.get_logs(500):
self.audit_tree.insert("", "end", values=(
log["timestamp"], log["username"], log["action"], log.get("details", "")))
def _export_audit_csv(self):
logs = self.db.get_logs(5000)
if not logs:
messagebox.showinfo("Info", "Aucun log"); return
p = filedialog.asksaveasfilename(defaultextension=".csv",
filetypes=[("CSV", "*.csv")],
initialfile=f"audit_patch_{datetime.now():%Y%m%d_%H%M%S}.csv")
if not p: return
with open(p, "w", newline="", encoding="utf-8-sig") as f:
w = csv.writer(f, delimiter=";")
w.writerow(["Timestamp", "Utilisateur", "Action", "Details"])
for l in logs:
w.writerow([l["timestamp"], l["username"], l["action"], l.get("details", "")])
self.audit("EXPORT_AUDIT", f"{len(logs)} entries -> {p}")
def _change_my_password(self):
dlg = ChangePasswordDialog(self.root, self.db, self.current_user["username"])
# Centrer sur le parent
if dlg.success:
messagebox.showinfo("OK", "Mot de passe modifie avec succes")
# ==========================================================================
# ONGLET 6 — UTILISATEURS (admin only)
# ==========================================================================
def _build_tab_users(self):
tab = self.tab_users
BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
# Creation user
cf = tk.Frame(tab, bg="#181825", pady=6)
cf.pack(fill="x")
tk.Label(cf, text="Gestion des utilisateurs", bg="#181825", fg=ACCENT,
font=("Consolas", 13, "bold")).pack(side="left", padx=12)
af = tk.Frame(tab, bg=BG, pady=6)
af.pack(fill="x", padx=12)
tk.Label(af, text="Utilisateur :", bg=BG, fg=FG, font=("Consolas", 10)).pack(side="left", padx=(0, 4))
self.new_user_var = tk.StringVar()
tk.Entry(af, textvariable=self.new_user_var, bg=BG2, fg=FG,
font=("Consolas", 10), width=15, insertbackground=FG).pack(side="left", padx=4)
tk.Label(af, text="MDP :", bg=BG, fg=FG, font=("Consolas", 10)).pack(side="left", padx=(8, 4))
self.new_pwd_var = tk.StringVar()
tk.Entry(af, textvariable=self.new_pwd_var, bg=BG2, fg=FG,
font=("Consolas", 10), width=15, show="*", insertbackground=FG).pack(side="left", padx=4)
tk.Label(af, text="Role :", bg=BG, fg=FG, font=("Consolas", 10)).pack(side="left", padx=(8, 4))
self.new_role_var = tk.StringVar(value="operator")
tk.OptionMenu(af, self.new_role_var, "admin", "operator", "viewer").pack(side="left", padx=4)
tk.Button(af, text="Creer", bg="#40a02b", fg="white", font=("Consolas", 9, "bold"),
command=self._create_user).pack(side="left", padx=8)
# Liste users
cols = ("username", "role", "locked", "must_change", "last_login")
self.users_tree = ttk.Treeview(tab, columns=cols, show="headings")
self.users_tree.heading("username", text="Utilisateur")
self.users_tree.heading("role", text="Role")
self.users_tree.heading("locked", text="Verrouille")
self.users_tree.heading("must_change", text="Chg. MDP")
self.users_tree.heading("last_login", text="Derniere connexion")
self.users_tree.column("username", width=150)
self.users_tree.column("role", width=100)
self.users_tree.column("locked", width=100)
self.users_tree.column("must_change", width=100)
self.users_tree.column("last_login", width=180)
self.users_tree.pack(fill="both", expand=True, padx=12, pady=6)
bf = tk.Frame(tab, bg=BG, pady=6)
bf.pack(fill="x", padx=12)
tk.Button(bf, text="Supprimer", bg="#f38ba8", fg="#1e1e2e",
font=("Consolas", 9, "bold"), command=self._delete_user).pack(side="left", padx=4)
tk.Button(bf, text="Deverrouiller", bg="#fab387", fg="#1e1e2e",
font=("Consolas", 9, "bold"), command=self._unlock_user).pack(side="left", padx=4)
tk.Button(bf, text="Reset MDP", bg=BTN, fg=FG,
font=("Consolas", 9), command=self._reset_user_pwd).pack(side="left", padx=4)
tk.Button(bf, text="Rafraichir", bg=BTN, fg=FG,
font=("Consolas", 9), command=self._refresh_users).pack(side="right", padx=4)
self._refresh_users()
def _refresh_users(self):
self.users_tree.delete(*self.users_tree.get_children())
for u in self.db.list_users():
self.users_tree.insert("", "end", values=(
u["username"], u["role"],
"OUI" if u["locked"] else "",
"OUI" if u["must_change_pwd"] else "",
u["last_login"] or ""))
def _create_user(self):
un = self.new_user_var.get().strip()
pw = self.new_pwd_var.get().strip()
role = self.new_role_var.get()
if not un or not pw:
messagebox.showwarning("Erreur", "Saisir un nom et un mot de passe"); return
if self.db.create_user(un, pw, role):
self.audit("CREATE_USER", f"{un} (role={role})")
self.new_user_var.set(""); self.new_pwd_var.set("")
self._refresh_users()
else:
messagebox.showerror("Erreur", f"L'utilisateur '{un}' existe deja")
def _delete_user(self):
sel = self.users_tree.selection()
if not sel: return
un = self.users_tree.item(sel[0])["values"][0]
if un == "admin":
messagebox.showwarning("Erreur", "Impossible de supprimer admin"); return
if messagebox.askyesno("Confirmer", f"Supprimer '{un}' ?"):
self.db.delete_user(un)
self.audit("DELETE_USER", un)
self._refresh_users()
def _unlock_user(self):
sel = self.users_tree.selection()
if not sel: return
un = self.users_tree.item(sel[0])["values"][0]
self.db.unlock_user(un)
self.audit("UNLOCK_USER", un)
self._refresh_users()
def _reset_user_pwd(self):
sel = self.users_tree.selection()
if not sel: return
un = self.users_tree.item(sel[0])["values"][0]
new_pwd = simpledialog.askstring("Reset", f"Nouveau mot de passe pour {un} :", show="*")
if new_pwd:
self.db.reset_password(un, new_pwd)
self.audit("RESET_PASSWORD", un)
self._refresh_users()
# ==============================================================================
# MAIN
# ==============================================================================
if __name__ == "__main__":
db = Database()
root = tk.Tk()
root.withdraw() # Cacher pendant le login
# Première initialisation : afficher le mot de passe admin généré
if hasattr(db, "_init_password"):
messagebox.showinfo(
"Premier lancement",
f"Base de données initialisée.\n\n"
f"Compte admin créé avec le mot de passe :\n\n"
f" {db._init_password}\n\n"
f"⚠ NOTEZ-LE MAINTENANT — il ne sera plus affiché.\n"
f"Vous devrez le changer à la première connexion.",
parent=root)
del db._init_password
login = LoginDialog(root, db)
if not login.result:
root.destroy()
sys.exit(0)
user = login.result
# Forcer changement de mot de passe au premier login
if user.get("must_change_pwd"):
dlg = ChangePasswordDialog(root, db, user["username"], forced=True)
if not dlg.success:
root.destroy()
sys.exit(0)
root.deiconify()
app = PatchManagerV2(root, db, user)
root.mainloop()