3562 lines
161 KiB
Python
3562 lines
161 KiB
Python
#!/usr/bin/env python3
|
|
# ==============================================================================
|
|
# patch_manager_v2.py — SANEF Linux Patch Manager GUI v2
|
|
# Gestion complète du patching Linux via Excel + SSH + vSphere
|
|
#
|
|
# Prérequis : py -m pip install paramiko openpyxl requests pyVmomi --proxy http://proxy.sanef.fr:8080
|
|
# Usage : py patch_manager_v2.py
|
|
# Auteur : MYPCZEN / SANEF DSI - SOC
|
|
# ==============================================================================
|
|
|
|
import tkinter as tk
|
|
from tkinter import ttk, filedialog, scrolledtext, messagebox, simpledialog
|
|
import threading
|
|
import paramiko
|
|
import socket
|
|
import time
|
|
import csv
|
|
import os
|
|
import sys
|
|
import json
|
|
from datetime import datetime, date
|
|
from copy import deepcopy
|
|
import sqlite3
|
|
import hashlib
|
|
import secrets
|
|
import ctypes
|
|
|
|
# openpyxl optionnel — installé si absent
|
|
try:
|
|
import openpyxl
|
|
except ImportError:
|
|
os.system("py -m pip install openpyxl --proxy http://proxy.sanef.fr:8080 -q")
|
|
import openpyxl
|
|
|
|
# pyVmomi optionnel — pour vSphere
|
|
try:
|
|
from pyVim.connect import SmartConnect, Disconnect
|
|
from pyVmomi import vim
|
|
VSPHERE_OK = True
|
|
except ImportError:
|
|
VSPHERE_OK = False
|
|
|
|
# ==============================================================================
|
|
# CONSTANTES
|
|
# ==============================================================================
|
|
VERSION = "2.0"
|
|
|
|
VSPHERE_HOSTS = [
|
|
"vpgesavcs1.sanef.groupe",
|
|
"vpmetavcs1.sanef.groupe",
|
|
"vpsicavcs1.sanef.groupe",
|
|
]
|
|
|
|
YUM_EXCLUDES_STD = (
|
|
"--exclude=*mongodb* --exclude=*mysql* --exclude=*postgres* "
|
|
"--exclude=*mariadb* --exclude=*oracle* --exclude=*pgdg* --exclude=*php* "
|
|
"--exclude=*java* --exclude=*redis* --exclude=*elasticsearch* --exclude=*nginx* "
|
|
"--exclude=*mod_ssl* --exclude=*haproxy* --exclude=*certbot* "
|
|
"--exclude=*python-certbot* --exclude=*docker* --exclude=*podman* "
|
|
"--exclude=*centreon* --exclude=*qwserver* "
|
|
"--exclude=*ansible* --exclude=*node* --exclude=*tina* --exclude=*memcached* "
|
|
"--exclude=*nextcloud* --exclude=*pgbouncer* --exclude=*pgpool* "
|
|
"--exclude=*pgbadger* --exclude=*psycopg2* --exclude=*barman* --exclude=*kibana* "
|
|
"--exclude=*sdcss*" # Symantec DCS agent + sdcss-kmod kernel module
|
|
)
|
|
YUM_EXCLUDES_FLUX_LIBRE = "--exclude=*podman*"
|
|
|
|
PRE_PATCH_SCRIPT = r"""
|
|
cat > /tmp/secops_pre_patching.sh << 'EOF'
|
|
HOSTNAME=$(hostname)
|
|
SNAPSHOT_DIR="/tmp"
|
|
systemctl list-units --type=service --state=running --no-pager \
|
|
| awk '{print $1}' | grep '\.service$' \
|
|
> ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt
|
|
echo "Services sauvegardes : $(wc -l < ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt)"
|
|
ss -tlnup > ${SNAPSHOT_DIR}/secops_ports_avant_${HOSTNAME}.txt
|
|
ss -tlnup | awk 'NR>1 && $7 != "" {
|
|
match($7, /users:\(\("([^"]+)"/, arr)
|
|
split($5, addr, ":")
|
|
port = addr[length(addr)]
|
|
if (arr[1] != "" && port+0 < 32768) print port, arr[1]
|
|
}' | sort -u > ${SNAPSHOT_DIR}/secops_ports_detail_avant_${HOSTNAME}.txt
|
|
echo "Ports sauvegardes : $(grep -c LISTEN ${SNAPSHOT_DIR}/secops_ports_avant_${HOSTNAME}.txt)"
|
|
echo "PRE_PATCH_OK"
|
|
EOF
|
|
bash /tmp/secops_pre_patching.sh
|
|
"""
|
|
|
|
POST_PATCH_SCRIPT = r"""
|
|
cat > /tmp/secops_post_patching.sh << 'EOF'
|
|
HOSTNAME=$(hostname)
|
|
SNAPSHOT_DIR="/tmp"
|
|
GREEN='\033[0;32m'; RED='\033[0;31m'; YELLOW='\033[0;33m'; NC='\033[0m'
|
|
RAPPORT="/tmp/rapport_patching_${HOSTNAME}_$(date +%Y%m%d_%H%M).txt"
|
|
systemctl list-units --type=service --state=running --no-pager \
|
|
| awk '{print $1}' | grep '\.service$' \
|
|
> ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt
|
|
DISPARUS_SVC=$(comm -23 \
|
|
<(sort ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt) \
|
|
<(sort ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt) \
|
|
| grep -v "user@")
|
|
APPARUS_SVC=$(comm -13 \
|
|
<(sort ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt) \
|
|
<(sort ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt) \
|
|
| grep -v "setroubleshootd\|user@")
|
|
echo "--- SERVICES ---" | tee -a ${RAPPORT}
|
|
echo "Avant: $(wc -l < ${SNAPSHOT_DIR}/secops_services_avant_${HOSTNAME}.txt) | Apres: $(wc -l < ${SNAPSHOT_DIR}/secops_services_apres_${HOSTNAME}.txt)" | tee -a ${RAPPORT}
|
|
[ -z "$DISPARUS_SVC" ] && echo "OK - Aucun service disparu" | tee -a ${RAPPORT} || { echo "ALERTE services disparus:" | tee -a ${RAPPORT}; echo "$DISPARUS_SVC" | tee -a ${RAPPORT}; }
|
|
[ -z "$APPARUS_SVC" ] && echo "OK - Aucun nouveau service" | tee -a ${RAPPORT} || { echo "WARN nouveaux services:" | tee -a ${RAPPORT}; echo "$APPARUS_SVC" | tee -a ${RAPPORT}; }
|
|
ss -tlnup > ${SNAPSHOT_DIR}/secops_ports_apres_${HOSTNAME}.txt
|
|
ss -tlnup | awk 'NR>1 && $7 != "" {
|
|
match($7, /users:\(\("([^"]+)"/, arr)
|
|
split($5, addr, ":")
|
|
port = addr[length(addr)]
|
|
if (arr[1] != "" && port+0 < 32768) print port, arr[1]
|
|
}' | sort -u > ${SNAPSHOT_DIR}/secops_ports_detail_apres_${HOSTNAME}.txt
|
|
PORTS_DISPARUS=$(comm -23 \
|
|
<(sort ${SNAPSHOT_DIR}/secops_ports_detail_avant_${HOSTNAME}.txt) \
|
|
<(sort ${SNAPSHOT_DIR}/secops_ports_detail_apres_${HOSTNAME}.txt))
|
|
PORTS_APPARUS=$(comm -13 \
|
|
<(sort ${SNAPSHOT_DIR}/secops_ports_detail_avant_${HOSTNAME}.txt) \
|
|
<(sort ${SNAPSHOT_DIR}/secops_ports_detail_apres_${HOSTNAME}.txt))
|
|
echo "--- PORTS ---" | tee -a ${RAPPORT}
|
|
echo "Avant: $(grep -c LISTEN ${SNAPSHOT_DIR}/secops_ports_avant_${HOSTNAME}.txt) | Apres: $(grep -c LISTEN ${SNAPSHOT_DIR}/secops_ports_apres_${HOSTNAME}.txt)" | tee -a ${RAPPORT}
|
|
[ -z "$PORTS_DISPARUS" ] && echo "OK - Aucun port disparu" | tee -a ${RAPPORT} || { echo "ALERTE ports disparus:" | tee -a ${RAPPORT}; echo "$PORTS_DISPARUS" | tee -a ${RAPPORT}; }
|
|
[ -z "$PORTS_APPARUS" ] && echo "OK - Aucun nouveau port" | tee -a ${RAPPORT} || { echo "WARN nouveaux ports:" | tee -a ${RAPPORT}; echo "$PORTS_APPARUS" | tee -a ${RAPPORT}; }
|
|
echo "" | tee -a ${RAPPORT}
|
|
echo "RAPPORT: ${RAPPORT}"
|
|
echo "POST_PATCH_OK"
|
|
EOF
|
|
bash /tmp/secops_post_patching.sh
|
|
"""
|
|
|
|
FL_PRE_PATCH_SCRIPT = r"""
|
|
cat > /tmp/secops_fl_pre_patch.sh << 'FLEOF'
|
|
# Detecter l'utilisateur applicatif podman
|
|
APP_USER=$(ps aux | grep -E "conmon|podman" | grep -v grep | awk '{print $1}' | sort -u | head -1)
|
|
if [ -z "$APP_USER" ]; then
|
|
echo "FL_NO_PODS"
|
|
exit 0
|
|
fi
|
|
echo "FL_USER=$APP_USER"
|
|
# Snapshot pods running via sudo su
|
|
sudo su - $APP_USER -c 'XDG_RUNTIME_DIR=/run/user/$(id -u) podman ps --format "{{.Names}}" --filter "status=running"' \
|
|
> /tmp/fl_pods_avant_$(hostname)_$(date +%Y%m%d_%H%M).txt
|
|
echo "FL_PODS_SAVED=$(wc -l < /tmp/fl_pods_avant_$(hostname)_*.txt | tail -1)"
|
|
# Status boo_manage si disponible
|
|
if command -v boo_manage &>/dev/null || sudo su - $APP_USER -c "which boo_manage" &>/dev/null; then
|
|
sudo su - $APP_USER -c "boo_manage -a -c status" 2>/dev/null || true
|
|
fi
|
|
echo "FL_PRE_OK"
|
|
FLEOF
|
|
bash /tmp/secops_fl_pre_patch.sh
|
|
"""
|
|
|
|
FL_POST_PATCH_SCRIPT = r"""
|
|
cat > /tmp/secops_fl_post_patch.sh << 'FLEOF'
|
|
APP_USER=$(ps aux | grep -E "conmon|podman" | grep -v grep | awk '{print $1}' | sort -u | head -1)
|
|
if [ -z "$APP_USER" ]; then
|
|
echo "FL_NO_PODS"
|
|
exit 0
|
|
fi
|
|
echo "FL_USER=$APP_USER"
|
|
SNAPSHOT=$(ls -t /tmp/fl_pods_avant_$(hostname)_*.txt 2>/dev/null | head -1)
|
|
if [ -z "$SNAPSHOT" ]; then
|
|
echo "FL_NO_SNAPSHOT"
|
|
sudo su - $APP_USER -c "boo_manage -a -c start" 2>/dev/null || true
|
|
exit 0
|
|
fi
|
|
AVANT=$(cat $SNAPSHOT)
|
|
APRES=$(sudo su - $APP_USER -c 'XDG_RUNTIME_DIR=/run/user/$(id -u) podman ps --format "{{.Names}}"')
|
|
ARRETES=0
|
|
while IFS= read -r pod; do
|
|
if ! echo "$APRES" | grep -q "^${pod}$"; then
|
|
echo "FL_RESTART=$pod"
|
|
sudo su - $APP_USER -c "boo_manage -p ${pod} -c start" 2>/dev/null || \
|
|
sudo su - $APP_USER -c "XDG_RUNTIME_DIR=/run/user/\$(id -u) podman start ${pod}" 2>/dev/null
|
|
ARRETES=$((ARRETES + 1))
|
|
fi
|
|
done <<< "$AVANT"
|
|
if [ $ARRETES -eq 0 ]; then
|
|
echo "FL_ALL_OK"
|
|
else
|
|
echo "FL_RESTARTED=$ARRETES"
|
|
fi
|
|
sudo su - $APP_USER -c "boo_manage -a -c status" 2>/dev/null || true
|
|
echo "FL_POST_OK"
|
|
FLEOF
|
|
bash /tmp/secops_fl_post_patch.sh
|
|
"""
|
|
|
|
PRESET_PACKAGES = {
|
|
"Patch global (avec excludes)": "",
|
|
"Java": "java-1.8.0-openjdk java-1.8.0-openjdk-headless",
|
|
"OpenSSL": "openssl",
|
|
"OpenSSH": "openssh openssh-server openssh-clients",
|
|
"gnutls + libsoup + rhc": "gnutls libsoup rhc",
|
|
"glibc": "glibc",
|
|
"mariadb-libs": "mariadb-libs",
|
|
"curl": "curl libcurl",
|
|
"Personnalise...": "CUSTOM",
|
|
}
|
|
|
|
SETTINGS_FILE = os.path.join(os.path.expanduser("~"), ".patch_manager_settings.json")
|
|
DB_PATH = os.path.join(os.path.expanduser("~"), ".patch_manager.db")
|
|
|
|
# ==============================================================================
|
|
# DATABASE — Auth + Audit
|
|
# ==============================================================================
|
|
_db_lock = threading.Lock()
|
|
|
|
def _hash_password(password, salt=None):
|
|
if salt is None:
|
|
salt = secrets.token_hex(16)
|
|
h = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 310_000)
|
|
return f"{salt}${h.hex()}"
|
|
|
|
def _verify_password(password, stored):
|
|
if "$" not in stored:
|
|
return False
|
|
salt, _ = stored.split("$", 1)
|
|
return _hash_password(password, salt) == stored
|
|
|
|
class Database:
|
|
def __init__(self):
|
|
self.conn = sqlite3.connect(DB_PATH, check_same_thread=False)
|
|
self.conn.row_factory = sqlite3.Row
|
|
self._init_tables()
|
|
|
|
def _exec(self, sql, params=()):
|
|
with _db_lock:
|
|
return self.conn.execute(sql, params)
|
|
|
|
def _commit(self):
|
|
with _db_lock:
|
|
self.conn.commit()
|
|
|
|
def _init_tables(self):
|
|
with _db_lock:
|
|
self.conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT UNIQUE NOT NULL,
|
|
password_hash TEXT NOT NULL,
|
|
role TEXT NOT NULL DEFAULT 'operator',
|
|
must_change_pwd INTEGER NOT NULL DEFAULT 1,
|
|
failed_attempts INTEGER NOT NULL DEFAULT 0,
|
|
locked INTEGER NOT NULL DEFAULT 0,
|
|
created_at TEXT DEFAULT (datetime('now','localtime')),
|
|
last_login TEXT
|
|
);
|
|
CREATE TABLE IF NOT EXISTS audit_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT DEFAULT (datetime('now','localtime')),
|
|
username TEXT,
|
|
action TEXT NOT NULL,
|
|
details TEXT
|
|
);
|
|
""")
|
|
# Compte admin par defaut
|
|
row = self.conn.execute("SELECT id FROM users WHERE username='admin'").fetchone()
|
|
if not row:
|
|
h = _hash_password("pzBL4l9p24t*$zbYnxHs1mBvp")
|
|
self.conn.execute(
|
|
"INSERT INTO users (username, password_hash, role, must_change_pwd) VALUES (?,?,?,?)",
|
|
("admin", h, "admin", 1))
|
|
self.conn.commit()
|
|
|
|
def authenticate(self, username, password):
|
|
row = self._exec("SELECT * FROM users WHERE LOWER(username)=LOWER(?)", (username,)).fetchone()
|
|
if not row:
|
|
return None, "Utilisateur inconnu"
|
|
if row["locked"]:
|
|
return None, "Compte verrouille (3 tentatives). Contactez l'admin."
|
|
if not _verify_password(password, row["password_hash"]):
|
|
attempts = row["failed_attempts"] + 1
|
|
if attempts >= 3:
|
|
self._exec("UPDATE users SET failed_attempts=?, locked=1 WHERE id=?", (attempts, row["id"]))
|
|
else:
|
|
self._exec("UPDATE users SET failed_attempts=? WHERE id=?", (attempts, row["id"]))
|
|
self._commit()
|
|
remaining = 3 - attempts
|
|
if remaining <= 0:
|
|
return None, "Compte verrouille apres 3 echecs."
|
|
return None, f"Mot de passe incorrect ({remaining} essai(s) restant(s))"
|
|
# Succes
|
|
self._exec("UPDATE users SET failed_attempts=0, last_login=datetime('now','localtime') WHERE id=?",
|
|
(row["id"],))
|
|
self._commit()
|
|
return dict(row), None
|
|
|
|
def change_password(self, username, new_password):
|
|
h = _hash_password(new_password)
|
|
self._exec("UPDATE users SET password_hash=?, must_change_pwd=0 WHERE username=?", (h, username))
|
|
self._commit()
|
|
|
|
def create_user(self, username, password, role="operator"):
|
|
try:
|
|
h = _hash_password(password)
|
|
self._exec("INSERT INTO users (username, password_hash, role, must_change_pwd) VALUES (?,?,?,1)",
|
|
(username, h, role))
|
|
self._commit()
|
|
return True
|
|
except sqlite3.IntegrityError:
|
|
return False
|
|
|
|
def delete_user(self, username):
|
|
self._exec("DELETE FROM users WHERE username=? AND username != 'admin'", (username,))
|
|
self._commit()
|
|
|
|
def unlock_user(self, username):
|
|
self._exec("UPDATE users SET locked=0, failed_attempts=0 WHERE username=?", (username,))
|
|
self._commit()
|
|
|
|
def reset_password(self, username, new_password):
|
|
h = _hash_password(new_password)
|
|
self._exec("UPDATE users SET password_hash=?, must_change_pwd=1, locked=0, failed_attempts=0 WHERE username=?",
|
|
(h, username))
|
|
self._commit()
|
|
|
|
def list_users(self):
|
|
return [dict(r) for r in self._exec("SELECT * FROM users ORDER BY username").fetchall()]
|
|
|
|
def log_action(self, username, action, details=""):
|
|
self._exec("INSERT INTO audit_log (username, action, details) VALUES (?,?,?)",
|
|
(username, action, details))
|
|
self._commit()
|
|
|
|
def get_logs(self, limit=500):
|
|
return [dict(r) for r in self._exec(
|
|
"SELECT * FROM audit_log ORDER BY id DESC LIMIT ?", (limit,)).fetchall()]
|
|
|
|
# ==============================================================================
|
|
# SPLUNK HEC — envoi de logs vers Splunk
|
|
# ==============================================================================
|
|
def send_to_splunk(settings, event_data):
|
|
"""Envoie un evenement vers Splunk via HEC (HTTP Event Collector).
|
|
settings doit contenir: splunk_url, splunk_token, splunk_index, splunk_enabled
|
|
event_data: dict avec les champs a envoyer"""
|
|
if not settings.get("splunk_enabled") or settings.get("splunk_enabled") == "false":
|
|
return
|
|
url = settings.get("splunk_url", "").strip()
|
|
token = settings.get("_splunk_token_session", settings.get("splunk_token", "")).strip()
|
|
if not url or not token:
|
|
return
|
|
try:
|
|
import urllib.request
|
|
import ssl
|
|
payload = json.dumps({
|
|
"index": settings.get("splunk_index", "main"),
|
|
"sourcetype": "patch_manager",
|
|
"source": "sanef_patch_manager_v2",
|
|
"host": socket.gethostname(),
|
|
"event": event_data,
|
|
}).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
url.rstrip("/") + "/services/collector/event",
|
|
data=payload,
|
|
headers={
|
|
"Authorization": f"Splunk {token}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
method="POST")
|
|
# Proxy SANEF si configure
|
|
proxy_url = settings.get("proxy_url", "")
|
|
if proxy_url:
|
|
handler = urllib.request.ProxyHandler({"https": proxy_url, "http": proxy_url})
|
|
opener = urllib.request.build_opener(handler)
|
|
else:
|
|
ctx = ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_peer = False
|
|
handler = urllib.request.HTTPSHandler(context=ctx)
|
|
opener = urllib.request.build_opener(handler)
|
|
opener.open(req, timeout=5)
|
|
except Exception:
|
|
pass # Silencieux — ne jamais bloquer le patching pour un log
|
|
|
|
# ==============================================================================
|
|
# WINDOW CENTERING — toujours sur le meme ecran
|
|
# ==============================================================================
|
|
def center_window(win, width=None, height=None, parent=None):
|
|
"""Centre une fenetre sur le meme ecran que le parent (ou le curseur si pas de parent)"""
|
|
win.update_idletasks()
|
|
if width is None:
|
|
width = win.winfo_width()
|
|
if height is None:
|
|
height = win.winfo_height()
|
|
try:
|
|
user32 = ctypes.windll.user32
|
|
|
|
class POINT(ctypes.Structure):
|
|
_fields_ = [("x", ctypes.c_long), ("y", ctypes.c_long)]
|
|
class RECT(ctypes.Structure):
|
|
_fields_ = [("left", ctypes.c_long), ("top", ctypes.c_long),
|
|
("right", ctypes.c_long), ("bottom", ctypes.c_long)]
|
|
class MONITORINFO(ctypes.Structure):
|
|
_fields_ = [("cbSize", ctypes.c_ulong), ("rcMonitor", RECT),
|
|
("rcWork", RECT), ("dwFlags", ctypes.c_ulong)]
|
|
|
|
MONITOR_DEFAULTTONEAREST = 2
|
|
if parent is not None:
|
|
# Centrer sur l'ecran du parent
|
|
px = parent.winfo_x() + parent.winfo_width() // 2
|
|
py = parent.winfo_y() + parent.winfo_height() // 2
|
|
pt = POINT(px, py)
|
|
else:
|
|
# Centrer sur l'ecran du curseur
|
|
pt = POINT()
|
|
user32.GetCursorPos(ctypes.byref(pt))
|
|
|
|
hmon = user32.MonitorFromPoint(pt, MONITOR_DEFAULTTONEAREST)
|
|
mi = MONITORINFO()
|
|
mi.cbSize = ctypes.sizeof(MONITORINFO)
|
|
user32.GetMonitorInfoW(hmon, ctypes.byref(mi))
|
|
work = mi.rcWork
|
|
sx = work.left + (work.right - work.left - width) // 2
|
|
sy = work.top + (work.bottom - work.top - height) // 2
|
|
except Exception:
|
|
if parent is not None:
|
|
sx = parent.winfo_x() + (parent.winfo_width() - width) // 2
|
|
sy = parent.winfo_y() + (parent.winfo_height() - height) // 2
|
|
else:
|
|
sx = (win.winfo_screenwidth() - width) // 2
|
|
sy = (win.winfo_screenheight() - height) // 2
|
|
win.geometry(f"{width}x{height}+{sx}+{sy}")
|
|
|
|
# ==============================================================================
|
|
# SETTINGS
|
|
# ==============================================================================
|
|
DEFAULT_SETTINGS = {
|
|
"keyfile": r"C:\scripts\id_rsa_cybglobal.pem",
|
|
"keyfile2": r"C:\scripts\id_rsa_cybsecope.ppk",
|
|
"cybr_user": "CYBP01336",
|
|
"target_user": "cybsecope",
|
|
"psmp": "psmp.sanef.fr",
|
|
"timeout": 20,
|
|
"parallelism": 3,
|
|
"excel_file": "",
|
|
"patcher": "",
|
|
"vs_user": "",
|
|
"splunk_enabled": "false",
|
|
"splunk_url": "",
|
|
"splunk_index": "main",
|
|
"proxy_url": "http://proxy.sanef.fr:8080",
|
|
}
|
|
|
|
def load_settings():
|
|
if os.path.exists(SETTINGS_FILE):
|
|
try:
|
|
with open(SETTINGS_FILE, "r") as f:
|
|
s = json.load(f)
|
|
DEFAULT_SETTINGS.update(s)
|
|
except Exception:
|
|
pass
|
|
return dict(DEFAULT_SETTINGS)
|
|
|
|
def save_settings(settings):
|
|
try:
|
|
with open(SETTINGS_FILE, "w") as f:
|
|
json.dump(settings, f, indent=2)
|
|
except Exception:
|
|
pass
|
|
|
|
# ==============================================================================
|
|
# EXCEL READER
|
|
# ==============================================================================
|
|
def get_week_sheet(wb):
|
|
"""Trouve la feuille dont le nom correspond à la semaine courante"""
|
|
week_num = date.today().isocalendar()[1]
|
|
candidates = [f"S{week_num}", f"S{week_num:02d}",
|
|
f"Semaine {week_num}", f"W{week_num}", str(week_num)]
|
|
for name in candidates:
|
|
if name in wb.sheetnames:
|
|
return name
|
|
# Retourner toutes les feuilles pour que l'utilisateur choisisse
|
|
return None
|
|
|
|
# Couleur vert Excel = patché OK
|
|
EXCEL_GREEN = "FF00B050"
|
|
|
|
def read_excel_servers(filepath, sheet_name=None):
|
|
"""Lit le fichier Excel et retourne la liste des serveurs Linux + couleurs.
|
|
Utilise une copie temporaire pour eviter les conflits OneDrive."""
|
|
import shutil, tempfile, io
|
|
tmp_dir = tempfile.mkdtemp()
|
|
tmp_path = os.path.join(tmp_dir, os.path.basename(filepath))
|
|
wb = None
|
|
# Methode 1: copie fichier
|
|
try:
|
|
shutil.copy2(filepath, tmp_path)
|
|
wb = openpyxl.load_workbook(tmp_path, data_only=True)
|
|
except Exception:
|
|
pass
|
|
# Methode 2: lecture binaire en memoire (bypass lock OneDrive)
|
|
if wb is None:
|
|
try:
|
|
with open(filepath, "rb") as f:
|
|
data = io.BytesIO(f.read())
|
|
wb = openpyxl.load_workbook(data, data_only=True)
|
|
except Exception:
|
|
pass
|
|
# Methode 3: ouverture directe
|
|
if wb is None:
|
|
wb = openpyxl.load_workbook(filepath, data_only=True)
|
|
|
|
if sheet_name is None:
|
|
sheet_name = get_week_sheet(wb)
|
|
if sheet_name is None:
|
|
return None, wb.sheetnames # retourner la liste pour que l'user choisisse
|
|
|
|
ws = wb[sheet_name]
|
|
|
|
# Lire les en-têtes (première ligne non vide)
|
|
headers = {}
|
|
header_row = None
|
|
for row_idx, row in enumerate(ws.iter_rows(max_row=5), start=1):
|
|
vals = [c.value for c in row if c.value is not None]
|
|
if len(vals) >= 3:
|
|
headers = {str(c.value).strip().lower(): c.column - 1
|
|
for c in row if c.value is not None}
|
|
header_row = row_idx
|
|
break
|
|
|
|
if not headers:
|
|
return [], wb.sheetnames
|
|
|
|
# Mapping champs (tolérance casse/accents)
|
|
def find_col(keys):
|
|
for k in keys:
|
|
for h, idx in headers.items():
|
|
if k.lower() in h.lower():
|
|
return idx
|
|
return None
|
|
|
|
col_server = find_col(["asset name", "nom du serveur", "serveur", "hostname", "server", "nom"])
|
|
col_os = find_col(["os", "système", "systeme"])
|
|
col_env = find_col(["environnement", "environment", "env"])
|
|
col_domain = find_col(["domaine", "domain"])
|
|
col_app = find_col(["nom complet", "application", "nom_complet"])
|
|
col_accord = find_col(["accord"])
|
|
col_date = find_col(["date du patch", "date patch", "date heure", "date_heure", "date patching", "date"])
|
|
col_patcher = find_col(["intervenant", "patcheur", "technicien"])
|
|
|
|
servers = []
|
|
for row in ws.iter_rows(min_row=header_row + 1):
|
|
vals = [c.value for c in row]
|
|
cells = list(row)
|
|
if not vals or all(v is None for v in vals):
|
|
continue
|
|
|
|
def get(col):
|
|
if col is None or col >= len(vals):
|
|
return ""
|
|
v = vals[col]
|
|
return str(v).strip() if v is not None else ""
|
|
|
|
os_val = get(col_os)
|
|
if "linux" not in os_val.lower():
|
|
continue
|
|
|
|
server_name = get(col_server)
|
|
if not server_name:
|
|
continue
|
|
|
|
# Parsing date
|
|
date_val = None
|
|
if col_date is not None and col_date < len(vals):
|
|
raw_date = vals[col_date]
|
|
if isinstance(raw_date, datetime):
|
|
date_val = raw_date.date()
|
|
elif isinstance(raw_date, date):
|
|
date_val = raw_date
|
|
elif raw_date:
|
|
try:
|
|
date_val = datetime.strptime(str(raw_date).strip()[:10], "%Y-%m-%d").date()
|
|
except Exception:
|
|
try:
|
|
date_val = datetime.strptime(str(raw_date).strip()[:10], "%d/%m/%Y").date()
|
|
except Exception:
|
|
date_val = None
|
|
# Accord — règle stricte :
|
|
# colonne absente OU cellule vide = NON (on ne patche pas sans accord explicite)
|
|
# Seul "oui" / True dans la cellule = accord confirmé
|
|
if col_accord is None or col_accord >= len(vals):
|
|
raw_accord = "non"
|
|
else:
|
|
raw_v = vals[col_accord]
|
|
if raw_v is None or str(raw_v).strip() == "":
|
|
raw_accord = "non" # vide = pas d'accord
|
|
elif isinstance(raw_v, bool):
|
|
raw_accord = "oui" if raw_v else "non"
|
|
else:
|
|
v_str = str(raw_v).strip().lower()
|
|
raw_accord = "oui" if v_str == "oui" else "non"
|
|
|
|
# Détecter si la cellule serveur a un fond vert = déjà patché
|
|
already_patched = False
|
|
if col_server is not None and col_server < len(cells):
|
|
try:
|
|
fill = cells[col_server].fill
|
|
bg = fill.fgColor.rgb if fill and fill.fgColor else ""
|
|
if bg == EXCEL_GREEN or bg.upper() == EXCEL_GREEN:
|
|
already_patched = True
|
|
except Exception:
|
|
pass
|
|
|
|
servers.append({
|
|
"server": server_name,
|
|
"os": os_val,
|
|
"env": get(col_env),
|
|
"domain": get(col_domain),
|
|
"app": get(col_app),
|
|
"accord": raw_accord if raw_accord else "non",
|
|
"date_patch": date_val,
|
|
"patcher": (get(col_patcher) or "").strip(),
|
|
"selected": False,
|
|
"snap_done": False,
|
|
"is_physical": False,
|
|
"status": "✅ DÉJÀ PATCHÉ" if already_patched else "—",
|
|
"patch_status": "PATCHED" if already_patched else "—",
|
|
"patch_detail": "Fond vert Excel" if already_patched else "",
|
|
"reboot_required": False,
|
|
"already_patched": already_patched,
|
|
"excel_row": cells[0].row if cells else None,
|
|
})
|
|
|
|
wb.close()
|
|
# Nettoyage copie temporaire
|
|
try:
|
|
os.remove(tmp_path)
|
|
os.rmdir(tmp_dir)
|
|
except Exception:
|
|
pass
|
|
return servers, None
|
|
|
|
# ==============================================================================
|
|
# SSH HELPERS
|
|
# ==============================================================================
|
|
def build_fqdn(server, env):
|
|
if "." in server:
|
|
return [server]
|
|
env_low = env.lower()
|
|
if "recette" in env_low or "rec" in env_low:
|
|
return [server, f"{server}.sanef-rec.fr"]
|
|
else:
|
|
return [server, f"{server}.sanef.groupe"]
|
|
|
|
def load_key(keyfile):
|
|
if not keyfile or not os.path.exists(keyfile.strip('"').strip("'")):
|
|
return None
|
|
keyfile = keyfile.strip('"').strip("'")
|
|
for cls in [paramiko.RSAKey, paramiko.Ed25519Key, paramiko.ECDSAKey]:
|
|
try:
|
|
return cls.from_private_key_file(keyfile)
|
|
except Exception:
|
|
continue
|
|
return None
|
|
|
|
def ssh_connect(server, env, settings, pkey, pkey2, cyb_password=None):
|
|
"""Connexion SSH avec fallback FQDN et clé. Retourne (client, hostname_effectif)"""
|
|
candidates = build_fqdn(server, env)
|
|
is_prod = "prod" in env.lower() or "production" in env.lower()
|
|
|
|
for hostname in candidates:
|
|
if is_prod and cyb_password:
|
|
# CyberArk keyboard-interactive
|
|
username = f"{settings['cybr_user']}@{settings['target_user']}@{hostname}"
|
|
password = cyb_password
|
|
try:
|
|
def handler(title, instructions, prompt_list):
|
|
return [password] * len(prompt_list)
|
|
transport = paramiko.Transport((settings["psmp"], 22))
|
|
transport.connect()
|
|
transport.auth_interactive(username, handler)
|
|
client = paramiko.SSHClient()
|
|
client._transport = transport
|
|
client._eff_host = hostname
|
|
return client, hostname
|
|
except Exception:
|
|
continue
|
|
else:
|
|
# SSH direct avec clé
|
|
for key in [k for k in [pkey, pkey2] if k]:
|
|
try:
|
|
client = paramiko.SSHClient()
|
|
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
client.connect(hostname=hostname, port=22,
|
|
username=settings["target_user"],
|
|
pkey=key, timeout=settings["timeout"],
|
|
look_for_keys=False, allow_agent=False)
|
|
client._eff_host = hostname
|
|
return client, hostname
|
|
except Exception:
|
|
continue
|
|
return None, None
|
|
|
|
def run_cmd(client, cmd, timeout=300):
|
|
try:
|
|
_, stdout, stderr = client.exec_command(cmd, timeout=timeout)
|
|
out = stdout.read().decode("utf-8", errors="ignore").strip()
|
|
err = stderr.read().decode("utf-8", errors="ignore").strip()
|
|
return out, err
|
|
except Exception as e:
|
|
return "", str(e)
|
|
|
|
def detect_physical(client):
|
|
"""Détecte si le serveur est physique ou VM"""
|
|
out, _ = run_cmd(client, "sudo virt-what 2>/dev/null || echo UNKNOWN", 10)
|
|
if not out or out == "UNKNOWN":
|
|
out2, _ = run_cmd(client, "sudo dmidecode -s system-manufacturer 2>/dev/null || echo UNKNOWN", 10)
|
|
vmware_kw = ["vmware", "virtualbox", "kvm", "xen", "qemu", "microsoft corporation"]
|
|
if any(k in out2.lower() for k in vmware_kw):
|
|
return False # VM
|
|
phys_kw = ["hp", "dell", "lenovo", "ibm", "supermicro", "cisco"]
|
|
if any(k in out2.lower() for k in phys_kw):
|
|
return True # Physique
|
|
if out and out != "UNKNOWN":
|
|
return False # virt-what a trouvé quelque chose = VM
|
|
return False # par défaut supposer VM (plus sûr)
|
|
|
|
def build_yum_command(domain, packages, exclude_kernel, dryrun):
|
|
"""Construit la commande yum selon le domaine"""
|
|
domain_low = domain.lower()
|
|
is_flux_libre = "flux libre" in domain_low
|
|
|
|
if is_flux_libre:
|
|
excludes = YUM_EXCLUDES_FLUX_LIBRE
|
|
else:
|
|
excludes = YUM_EXCLUDES_STD
|
|
|
|
if exclude_kernel:
|
|
excludes += " --exclude=*kernel*"
|
|
|
|
if dryrun:
|
|
if packages:
|
|
cmd = f"sudo yum check-update {packages} 2>/dev/null | grep -vE '^$|^Load|Updat|kB|bps|00:|^Red|^EPEL|^Sub' | grep -E '^[a-z]' | head -20"
|
|
else:
|
|
cmd = f"sudo yum check-update {excludes} 2>/dev/null | grep -vE '^$|^Load|Updat|kB|bps|00:|^Red|^EPEL|^Sub' | grep -E '^[a-z]' | head -30"
|
|
else:
|
|
if packages:
|
|
cmd = f"sudo yum update -y {packages} 2>&1 | tail -25"
|
|
else:
|
|
cmd = f"sudo yum update -y {excludes} 2>&1 | tail -25"
|
|
|
|
return cmd
|
|
|
|
# ==============================================================================
|
|
# LOGIN DIALOG
|
|
# ==============================================================================
|
|
class LoginDialog(tk.Toplevel):
|
|
def __init__(self, parent, db):
|
|
super().__init__(parent)
|
|
self.db = db
|
|
self.result = None # {"username":..., "role":...} ou None
|
|
self.title("SANEF Patch Manager - Connexion")
|
|
self.configure(bg="#1e1e2e")
|
|
self.resizable(False, False)
|
|
self.protocol("WM_DELETE_WINDOW", self._quit)
|
|
self._build()
|
|
center_window(self, 420, 340)
|
|
self.grab_set()
|
|
self.bind("<Return>", lambda e: self._login())
|
|
self.wait_window()
|
|
|
|
def _build(self):
|
|
BG = "#1e1e2e"; BG2 = "#2a2a3e"; FG = "#cdd6f4"; ACCENT = "#89b4fa"
|
|
tk.Label(self, text="SANEF PATCH MANAGER", bg="#181825", fg=ACCENT,
|
|
font=("Consolas", 14, "bold"), pady=10).pack(fill="x")
|
|
tk.Label(self, text="Authentification requise", bg=BG, fg="#6c7086",
|
|
font=("Consolas", 10)).pack(pady=(10, 15))
|
|
|
|
f = tk.Frame(self, bg=BG)
|
|
f.pack(pady=5)
|
|
tk.Label(f, text="Utilisateur :", bg=BG, fg=FG, font=("Consolas", 11),
|
|
width=14, anchor="w").grid(row=0, column=0, padx=8, pady=6)
|
|
self.user_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 11),
|
|
insertbackground=FG, width=22)
|
|
self.user_entry.grid(row=0, column=1, padx=8, pady=6)
|
|
self.user_entry.focus_set()
|
|
|
|
tk.Label(f, text="Mot de passe :", bg=BG, fg=FG, font=("Consolas", 11),
|
|
width=14, anchor="w").grid(row=1, column=0, padx=8, pady=6)
|
|
self.pwd_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 11),
|
|
insertbackground=FG, width=22, show="*")
|
|
self.pwd_entry.grid(row=1, column=1, padx=8, pady=6)
|
|
|
|
self.msg_label = tk.Label(self, text="", bg=BG, fg="#f38ba8",
|
|
font=("Consolas", 10))
|
|
self.msg_label.pack(pady=8)
|
|
|
|
bf = tk.Frame(self, bg=BG)
|
|
bf.pack(pady=10)
|
|
tk.Button(bf, text="Connexion", bg="#40a02b", fg="white",
|
|
font=("Consolas", 11, "bold"), padx=20, pady=4,
|
|
command=self._login).pack(side="left", padx=8)
|
|
tk.Button(bf, text="Quitter", bg="#313244", fg="#cdd6f4",
|
|
font=("Consolas", 10), padx=12, pady=4,
|
|
command=self._quit).pack(side="left", padx=8)
|
|
|
|
tk.Label(self, text=f"SQATM Patch Manager v{VERSION}", bg=BG, fg="#45475a",
|
|
font=("Consolas", 9)).pack(side="bottom", pady=5)
|
|
|
|
def _login(self):
|
|
username = self.user_entry.get().strip()
|
|
password = self.pwd_entry.get()
|
|
if not username or not password:
|
|
self.msg_label.configure(text="Saisir utilisateur et mot de passe")
|
|
return
|
|
user, error = self.db.authenticate(username, password)
|
|
if error:
|
|
self.msg_label.configure(text=error)
|
|
self.pwd_entry.delete(0, "end")
|
|
return
|
|
self.db.log_action(username, "LOGIN", f"role={user['role']}")
|
|
self.result = user
|
|
self.destroy()
|
|
|
|
def _quit(self):
|
|
self.result = None
|
|
self.destroy()
|
|
|
|
|
|
# ==============================================================================
|
|
# CHANGE PASSWORD DIALOG
|
|
# ==============================================================================
|
|
class ChangePasswordDialog(tk.Toplevel):
|
|
def __init__(self, parent, db, username, forced=False):
|
|
super().__init__(parent)
|
|
self._parent = parent
|
|
self.db = db
|
|
self.username = username
|
|
self.forced = forced
|
|
self.success = False
|
|
self.title("Changement de mot de passe")
|
|
self.configure(bg="#1e1e2e")
|
|
self.resizable(False, False)
|
|
if forced:
|
|
self.protocol("WM_DELETE_WINDOW", lambda: None)
|
|
self._build()
|
|
center_window(self, 420, 300, parent=parent)
|
|
self.grab_set()
|
|
self.bind("<Return>", lambda e: self._change())
|
|
self.wait_window()
|
|
|
|
def _build(self):
|
|
BG = "#1e1e2e"; BG2 = "#2a2a3e"; FG = "#cdd6f4"; ACCENT = "#89b4fa"
|
|
msg = "Premier login : vous devez changer votre mot de passe" if self.forced else "Changement de mot de passe"
|
|
tk.Label(self, text=msg, bg="#181825", fg=ACCENT,
|
|
font=("Consolas", 11, "bold"), pady=10).pack(fill="x")
|
|
|
|
f = tk.Frame(self, bg=BG)
|
|
f.pack(pady=15)
|
|
tk.Label(f, text="Nouveau mdp :", bg=BG, fg=FG, font=("Consolas", 10),
|
|
width=16, anchor="w").grid(row=0, column=0, padx=8, pady=6)
|
|
self.new_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 10),
|
|
insertbackground=FG, width=22, show="*")
|
|
self.new_entry.grid(row=0, column=1, padx=8, pady=6)
|
|
self.new_entry.focus_set()
|
|
|
|
tk.Label(f, text="Confirmer :", bg=BG, fg=FG, font=("Consolas", 10),
|
|
width=16, anchor="w").grid(row=1, column=0, padx=8, pady=6)
|
|
self.confirm_entry = tk.Entry(f, bg=BG2, fg=FG, font=("Consolas", 10),
|
|
insertbackground=FG, width=22, show="*")
|
|
self.confirm_entry.grid(row=1, column=1, padx=8, pady=6)
|
|
|
|
self.msg_label = tk.Label(self, text="", bg=BG, fg="#f38ba8",
|
|
font=("Consolas", 10))
|
|
self.msg_label.pack(pady=5)
|
|
|
|
tk.Button(self, text="Valider", bg="#40a02b", fg="white",
|
|
font=("Consolas", 11, "bold"), padx=20, pady=4,
|
|
command=self._change).pack(pady=10)
|
|
|
|
def _change(self):
|
|
p1 = self.new_entry.get()
|
|
p2 = self.confirm_entry.get()
|
|
if len(p1) < 4:
|
|
self.msg_label.configure(text="Minimum 4 caracteres")
|
|
return
|
|
if p1 != p2:
|
|
self.msg_label.configure(text="Les mots de passe ne correspondent pas")
|
|
return
|
|
self.db.change_password(self.username, p1)
|
|
self.db.log_action(self.username, "CHANGE_PASSWORD", "")
|
|
self.success = True
|
|
self.destroy()
|
|
|
|
|
|
# ==============================================================================
|
|
# DIALOGUE SETTINGS
|
|
# ==============================================================================
|
|
class SettingsDialog(tk.Toplevel):
|
|
def __init__(self, parent, settings):
|
|
super().__init__(parent)
|
|
self._parent = parent
|
|
self.title("Parametres")
|
|
self.configure(bg="#1e1e2e")
|
|
self.resizable(True, True)
|
|
self.settings = dict(settings)
|
|
self.result = None
|
|
self._build()
|
|
# Taille adaptee a l'ecran
|
|
sh = parent.winfo_screenheight()
|
|
h = min(780, int(sh * 0.85))
|
|
center_window(self, 600, h, parent=parent)
|
|
self.grab_set()
|
|
self.wait_window()
|
|
|
|
def _build(self):
|
|
BG = "#1e1e2e"; BG2 = "#2a2a3e"; FG = "#cdd6f4"; ACCENT = "#89b4fa"
|
|
|
|
# Titre fixe en haut
|
|
tk.Label(self, text="PARAMETRES", bg="#313244", fg=ACCENT,
|
|
font=("Consolas",13,"bold"), pady=8).pack(fill="x")
|
|
|
|
# Zone scrollable
|
|
canvas = tk.Canvas(self, bg=BG, highlightthickness=0)
|
|
scrollbar = ttk.Scrollbar(self, orient="vertical", command=canvas.yview)
|
|
self.scroll_frame = tk.Frame(canvas, bg=BG)
|
|
self.scroll_frame.bind("<Configure>", lambda e: canvas.configure(scrollregion=canvas.bbox("all")))
|
|
canvas.create_window((0, 0), window=self.scroll_frame, anchor="nw")
|
|
canvas.configure(yscrollcommand=scrollbar.set)
|
|
canvas.pack(side="left", fill="both", expand=True)
|
|
scrollbar.pack(side="right", fill="y")
|
|
# Scroll molette
|
|
canvas.bind_all("<MouseWheel>", lambda e: canvas.yview_scroll(int(-1*(e.delta/120)), "units"))
|
|
sf = self.scroll_frame
|
|
|
|
def row(label, key, default=""):
|
|
f = tk.Frame(sf, bg=BG)
|
|
f.pack(fill="x", padx=20, pady=4)
|
|
tk.Label(f, text=label, bg=BG, fg=FG, font=("Consolas",10),
|
|
width=22, anchor="w").pack(side="left")
|
|
var = tk.StringVar(value=self.settings.get(key, default))
|
|
setattr(self, f"var_{key}", var)
|
|
tk.Entry(f, textvariable=var, bg=BG2, fg=FG,
|
|
font=("Consolas",10), insertbackground=FG,
|
|
width=32).pack(side="left", padx=4)
|
|
return var
|
|
|
|
tk.Label(sf, text="SSH — Recette", bg=BG, fg=ACCENT,
|
|
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
|
|
row("Cle SSH principale :", "keyfile")
|
|
row("Cle SSH secondaire :", "keyfile2")
|
|
|
|
tk.Label(sf, text="CyberArk — Production", bg=BG, fg=ACCENT,
|
|
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
|
|
row("Compte CyberArk :", "cybr_user", "CYBP01336")
|
|
row("Utilisateur cible :", "target_user", "cybsecope")
|
|
row("PSMP hostname :", "psmp", "psmp.sanef.fr")
|
|
|
|
tk.Label(sf, text="Identite patcheur", bg=BG, fg=ACCENT,
|
|
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
|
|
row("Nom du patcheur :", "patcher", "")
|
|
|
|
tk.Label(sf, text="vSphere — Snapshots", bg=BG, fg=ACCENT,
|
|
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
|
|
row("Utilisateur vCenter :", "vs_user", "")
|
|
f_pwd = tk.Frame(sf, bg=BG)
|
|
f_pwd.pack(fill="x", padx=20, pady=4)
|
|
tk.Label(f_pwd, text="Mot de passe vCenter :", bg=BG, fg=FG,
|
|
font=("Consolas",10), width=22, anchor="w").pack(side="left")
|
|
self.var_vs_pwd = tk.StringVar(value="") # Jamais pre-rempli
|
|
tk.Entry(f_pwd, textvariable=self.var_vs_pwd, show="*",
|
|
bg=BG2, fg=FG, font=("Consolas",10),
|
|
insertbackground=FG, width=32).pack(side="left", padx=4)
|
|
tk.Label(f_pwd, text="(non stocke)", bg=BG, fg="#6c7086",
|
|
font=("Consolas",8)).pack(side="left", padx=4)
|
|
|
|
tk.Label(sf, text="Connexion SSH", bg=BG, fg=ACCENT,
|
|
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
|
|
row("Timeout SSH (s) :", "timeout", "20")
|
|
row("Parallelisme :", "parallelism", "3")
|
|
|
|
tk.Label(sf, text="Splunk HEC (logs)", bg=BG, fg=ACCENT,
|
|
font=("Consolas",10,"bold")).pack(anchor="w", padx=20, pady=(12,2))
|
|
self.var_splunk_enabled = tk.BooleanVar(
|
|
value=self.settings.get("splunk_enabled", "false") == "true")
|
|
tk.Checkbutton(sf, text="Activer l'envoi vers Splunk",
|
|
variable=self.var_splunk_enabled,
|
|
bg=BG, fg=FG, selectcolor=BG2, activebackground=BG,
|
|
font=("Consolas",10)).pack(anchor="w", padx=20)
|
|
row("URL HEC :", "splunk_url", "https://splunk.sanef.fr:8088")
|
|
# Token non stocke dans le JSON — session only
|
|
f_tok = tk.Frame(sf, bg=BG)
|
|
f_tok.pack(fill="x", padx=20, pady=4)
|
|
tk.Label(f_tok, text="Token HEC :", bg=BG, fg=FG,
|
|
font=("Consolas",10), width=22, anchor="w").pack(side="left")
|
|
self.var_splunk_token = tk.StringVar(value="") # Jamais pre-rempli
|
|
tk.Entry(f_tok, textvariable=self.var_splunk_token, show="*",
|
|
bg=BG2, fg=FG, font=("Consolas",10),
|
|
insertbackground=FG, width=32).pack(side="left", padx=4)
|
|
tk.Label(f_tok, text="(non stocke)", bg=BG, fg="#6c7086",
|
|
font=("Consolas",8)).pack(side="left", padx=4)
|
|
row("Index :", "splunk_index", "main")
|
|
row("Proxy (optionnel) :", "proxy_url", "http://proxy.sanef.fr:8080")
|
|
|
|
# Boutons en bas du scroll
|
|
bf = tk.Frame(sf, bg=BG)
|
|
bf.pack(pady=16)
|
|
tk.Button(bf, text="Sauvegarder", bg="#40a02b", fg="white",
|
|
font=("Consolas",11,"bold"), padx=12, pady=6,
|
|
command=self._save).pack(side="left", padx=8)
|
|
tk.Button(bf, text="Annuler", bg="#313244", fg="#cdd6f4",
|
|
font=("Consolas",10), padx=12, pady=6,
|
|
command=self.destroy).pack(side="left", padx=8)
|
|
|
|
def _save(self):
|
|
keys = ["keyfile","keyfile2","cybr_user","target_user","psmp","timeout","parallelism",
|
|
"patcher","vs_user","splunk_url","splunk_token","splunk_index","proxy_url"]
|
|
for k in keys:
|
|
var = getattr(self, f"var_{k}", None)
|
|
if var:
|
|
val = var.get().strip()
|
|
if k in ("timeout","parallelism"):
|
|
try: val = int(val)
|
|
except: val = DEFAULT_SETTINGS.get(k, 20)
|
|
self.settings[k] = val
|
|
# Splunk enabled (stocke dans JSON — pas sensible)
|
|
if hasattr(self, "var_splunk_enabled"):
|
|
self.settings["splunk_enabled"] = "true" if self.var_splunk_enabled.get() else "false"
|
|
# vCenter pwd et Splunk token : session only, jamais dans le JSON
|
|
if hasattr(self, "var_vs_pwd"):
|
|
self.settings["_vs_pwd_session"] = self.var_vs_pwd.get()
|
|
if hasattr(self, "var_splunk_token"):
|
|
self.settings["_splunk_token_session"] = self.var_splunk_token.get()
|
|
self.result = self.settings
|
|
# Sauvegarder dans JSON SANS les secrets
|
|
to_save = {k: v for k, v in self.settings.items() if not k.startswith("_")}
|
|
save_settings(to_save)
|
|
self.destroy()
|
|
|
|
# ==============================================================================
|
|
# DIALOGUE CONFIRMATION COMMANDE YUM
|
|
# ==============================================================================
|
|
class YumConfirmDialog(tk.Toplevel):
|
|
def __init__(self, parent, server, cmd):
|
|
super().__init__(parent)
|
|
self.title("Confirmation commande patch")
|
|
self.configure(bg="#1e1e2e")
|
|
self.result = None # "apply", "apply_all", "cancel"
|
|
self.cmd_var = tk.StringVar(value=cmd)
|
|
self._build(server, cmd)
|
|
center_window(self, 680, 340, parent=parent)
|
|
self.grab_set()
|
|
self.wait_window()
|
|
|
|
def _build(self, server, cmd):
|
|
BG = "#1e1e2e"; FG = "#cdd6f4"; ACCENT = "#89b4fa"; YELLOW = "#f9e2af"
|
|
|
|
tk.Label(self, text="⚠️ CONFIRMATION COMMANDE PATCH",
|
|
bg="#313244", fg=ACCENT, font=("Consolas",12,"bold"),
|
|
pady=8).pack(fill="x")
|
|
|
|
tk.Label(self, text=f"Serveur : {server}",
|
|
bg=BG, fg=YELLOW, font=("Consolas",11)).pack(anchor="w", padx=16, pady=6)
|
|
|
|
tk.Label(self, text="Commande à exécuter :",
|
|
bg=BG, fg=FG, font=("Consolas",10)).pack(anchor="w", padx=16)
|
|
|
|
cmd_entry = tk.Text(self, height=5, bg="#11111b", fg="#a6e3a1",
|
|
font=("Consolas",10), wrap="word",
|
|
insertbackground=FG)
|
|
cmd_entry.insert("1.0", cmd)
|
|
cmd_entry.pack(fill="x", padx=16, pady=4)
|
|
self._cmd_text = cmd_entry
|
|
|
|
bf = tk.Frame(self, bg=BG)
|
|
bf.pack(pady=12)
|
|
|
|
tk.Button(bf, text="▶ Appliquer",
|
|
bg="#40a02b", fg="white", font=("Consolas",11,"bold"),
|
|
padx=10, pady=6,
|
|
command=lambda: self._set("apply")).pack(side="left", padx=6)
|
|
|
|
tk.Button(bf, text="▶▶ Appliquer à TOUS",
|
|
bg="#1e66f5", fg="white", font=("Consolas",11,"bold"),
|
|
padx=10, pady=6,
|
|
command=lambda: self._set("apply_all")).pack(side="left", padx=6)
|
|
|
|
tk.Button(bf, text="✗ Annuler ce serveur",
|
|
bg="#d20f39", fg="white", font=("Consolas",10),
|
|
padx=10, pady=6,
|
|
command=lambda: self._set("cancel")).pack(side="left", padx=6)
|
|
|
|
def _set(self, val):
|
|
self.result = val
|
|
self.final_cmd = self._cmd_text.get("1.0", "end").strip()
|
|
self.destroy()
|
|
|
|
# ==============================================================================
|
|
# APPLICATION PRINCIPALE
|
|
# ==============================================================================
|
|
class PatchManagerV2:
|
|
def __init__(self, root, db, current_user):
|
|
self.root = root
|
|
self.db = db
|
|
self.current_user = current_user # {"username":..., "role":...}
|
|
self.root.title(f"SANEF Linux Patch Manager v{VERSION} — {current_user['username']} ({current_user['role']})")
|
|
self.root.configure(bg="#1e1e2e")
|
|
# Plein ecran sur le moniteur ou se trouve la fenetre
|
|
try:
|
|
center_window(root, root.winfo_screenwidth(), root.winfo_screenheight(), parent=root)
|
|
root.after(100, lambda: root.state("zoomed"))
|
|
except Exception:
|
|
root.state("zoomed")
|
|
self.settings = load_settings()
|
|
self.servers = []
|
|
self.results = []
|
|
self.running = False
|
|
self.pkey = None
|
|
self.pkey2 = None
|
|
self.cyb_pwd = None
|
|
self.apply_all_cmd = False
|
|
self.is_current_week = True # True = semaine courante (patch autorise)
|
|
|
|
self._build_ui()
|
|
self._reload_keys()
|
|
self.audit("APP_START", "")
|
|
|
|
# ==========================================================================
|
|
# COULEURS
|
|
# ==========================================================================
|
|
BG = "#1e1e2e"
|
|
BG2 = "#2a2a3e"
|
|
FG = "#cdd6f4"
|
|
ACCENT = "#89b4fa"
|
|
GREEN = "#a6e3a1"
|
|
RED = "#f38ba8"
|
|
YELLOW = "#f9e2af"
|
|
ORANGE = "#fab387"
|
|
BTN = "#313244"
|
|
|
|
# ==========================================================================
|
|
# AUDIT HELPER
|
|
# ==========================================================================
|
|
def audit(self, action, details=""):
|
|
username = self.current_user["username"]
|
|
self.db.log_action(username, action, details)
|
|
# Envoi Splunk en arriere-plan (non bloquant)
|
|
threading.Thread(target=send_to_splunk, args=(self.settings, {
|
|
"action": action, "user": username, "details": details,
|
|
"timestamp": datetime.now().isoformat(),
|
|
}), daemon=True).start()
|
|
|
|
# ==========================================================================
|
|
# BUILD UI
|
|
# ==========================================================================
|
|
def _build_ui(self):
|
|
BG=self.BG; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
|
|
|
|
# Style ttk
|
|
style = ttk.Style()
|
|
style.theme_use("clam")
|
|
style.configure("TFrame", background=self.BG)
|
|
style.configure("TLabel", background=self.BG, foreground=self.FG, font=("Consolas",10))
|
|
style.configure("Treeview", background=self.BG2, foreground=self.FG,
|
|
fieldbackground=self.BG2, font=("Consolas",9), rowheight=22)
|
|
style.configure("Treeview.Heading", background=self.BTN, foreground=self.ACCENT,
|
|
font=("Consolas",10,"bold"))
|
|
style.map("Treeview", background=[("selected","#45475a")])
|
|
style.configure("TCombobox", fieldbackground=self.BG2, foreground=self.FG,
|
|
background=self.BG2, font=("Consolas",10))
|
|
style.configure("TCheckbutton",background=self.BG, foreground=self.FG, font=("Consolas",10))
|
|
|
|
# ── Barre titre ────────────────────────────────────────────────────────
|
|
tb = tk.Frame(self.root, bg="#181825", pady=6)
|
|
tb.pack(fill="x")
|
|
tk.Label(tb, text=f"⚡ SANEF LINUX PATCH MANAGER v{VERSION}",
|
|
bg="#181825", fg=ACCENT, font=("Consolas",14,"bold")).pack(side="left", padx=12)
|
|
tk.Label(tb, text=f"Semaine {date.today().isocalendar()[1]} | {date.today():%d/%m/%Y}",
|
|
bg="#181825", fg="#6c7086", font=("Consolas",10)).pack(side="left", padx=12)
|
|
tk.Button(tb, text="A propos", bg=BTN, fg="#6c7086",
|
|
font=("Consolas",9), pady=2,
|
|
command=self._show_about).pack(side="right", padx=4)
|
|
tk.Button(tb, text="Mon MDP", bg=BTN, fg="#fab387",
|
|
font=("Consolas",9), pady=2,
|
|
command=self._change_my_password).pack(side="right", padx=4)
|
|
tk.Button(tb, text="Settings", bg=BTN, fg=FG,
|
|
font=("Consolas",10), pady=2,
|
|
command=self._open_settings).pack(side="right", padx=12)
|
|
|
|
# ── Notebook (onglets) ─────────────────────────────────────────────────
|
|
nb_style = ttk.Style()
|
|
nb_style.configure("TNotebook", background=self.BG, borderwidth=0)
|
|
nb_style.configure("TNotebook.Tab", background=self.BTN, foreground=self.FG,
|
|
font=("Consolas",10,"bold"), padding=[12,4])
|
|
nb_style.map("TNotebook.Tab",
|
|
background=[("selected","#313244")],
|
|
foreground=[("selected",self.ACCENT)])
|
|
|
|
self.nb = ttk.Notebook(self.root)
|
|
self.nb.pack(fill="both", expand=True, padx=6, pady=4)
|
|
|
|
# Onglets
|
|
self.tab_servers = tk.Frame(self.nb, bg=BG)
|
|
self.tab_patch = tk.Frame(self.nb, bg=BG)
|
|
self.tab_post = tk.Frame(self.nb, bg=BG)
|
|
self.tab_report = tk.Frame(self.nb, bg=BG)
|
|
self.tab_audit = tk.Frame(self.nb, bg=BG)
|
|
|
|
self.nb.add(self.tab_servers, text="1. Selection serveurs")
|
|
self.nb.add(self.tab_patch, text="2. Patch")
|
|
self.nb.add(self.tab_post, text="3. Post-patching")
|
|
self.nb.add(self.tab_report, text="4. Rapport")
|
|
if self.current_user["role"] == "admin":
|
|
self.nb.add(self.tab_audit, text="5. Audit")
|
|
self.tab_users = tk.Frame(self.nb, bg=BG)
|
|
self.nb.add(self.tab_users, text="6. Utilisateurs")
|
|
|
|
self._build_tab_servers()
|
|
self._build_tab_patch()
|
|
self._build_tab_post()
|
|
self._build_tab_report()
|
|
if self.current_user["role"] == "admin":
|
|
self._build_tab_audit()
|
|
self._build_tab_users()
|
|
|
|
# Viewer = onglets patch/post desactives
|
|
if self.current_user["role"] == "viewer":
|
|
self.nb.tab(self.tab_patch, state="disabled")
|
|
self.nb.tab(self.tab_post, state="disabled")
|
|
|
|
# ==========================================================================
|
|
# ONGLET 1 — SÉLECTION SERVEURS
|
|
# ==========================================================================
|
|
def _build_tab_servers(self):
|
|
tab = self.tab_servers
|
|
BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
|
|
|
|
# ── Ligne 1 : Excel ───────────────────────────────────────────────────
|
|
ef = tk.Frame(tab, bg=BG)
|
|
ef.pack(fill="x", padx=10, pady=6)
|
|
|
|
tk.Label(ef, text="📊 Fichier Excel :", bg=BG, fg=FG,
|
|
font=("Consolas",10)).pack(side="left")
|
|
self.excel_var = tk.StringVar(value=self.settings.get("excel_file",""))
|
|
tk.Entry(ef, textvariable=self.excel_var, bg=BG2, fg=FG,
|
|
font=("Consolas",10), insertbackground=FG,
|
|
width=55).pack(side="left", padx=6)
|
|
tk.Button(ef, text="Parcourir", bg=BTN, fg=FG, font=("Consolas",9),
|
|
command=self._browse_excel).pack(side="left", padx=2)
|
|
|
|
# Feuille
|
|
tk.Label(ef, text="Feuille :", bg=BG, fg=FG,
|
|
font=("Consolas",10)).pack(side="left", padx=(12,2))
|
|
self.sheet_var = tk.StringVar()
|
|
self.sheet_cb = ttk.Combobox(ef, textvariable=self.sheet_var,
|
|
state="readonly", width=10)
|
|
self.sheet_cb.pack(side="left", padx=2)
|
|
self.sheet_cb.bind("<<ComboboxSelected>>", lambda e: self._load_sheet())
|
|
|
|
# Navigation rapide semaines
|
|
tk.Button(ef, text="◀", bg=BTN, fg=FG, font=("Consolas",10),
|
|
width=2, command=self._prev_week).pack(side="left", padx=1)
|
|
tk.Button(ef, text="▶", bg=BTN, fg=FG, font=("Consolas",10),
|
|
width=2, command=self._next_week).pack(side="left", padx=1)
|
|
tk.Label(ef, text="S. courante", bg=BG, fg="#6c7086",
|
|
font=("Consolas",9)).pack(side="left", padx=2)
|
|
tk.Button(ef, text="↺", bg=BTN, fg=self.ACCENT, font=("Consolas",10),
|
|
width=2, command=self._goto_current_week).pack(side="left", padx=1)
|
|
|
|
tk.Button(ef, text="Charger", bg="#1e66f5", fg="white",
|
|
font=("Consolas",10,"bold"), pady=3,
|
|
command=self._load_excel).pack(side="left", padx=8)
|
|
|
|
# Indicateur semaine courante/visu
|
|
self.week_mode_lbl = tk.Label(ef, text="", bg=self.BG,
|
|
font=("Consolas",10,"bold"))
|
|
self.week_mode_lbl.pack(side="left", padx=8)
|
|
|
|
# ── Ligne 2 : Filtres ─────────────────────────────────────────────────
|
|
ff = tk.Frame(tab, bg=BG)
|
|
ff.pack(fill="x", padx=10, pady=4)
|
|
|
|
tk.Label(ff, text="Filtres :", bg=BG, fg=ACCENT,
|
|
font=("Consolas",10,"bold")).pack(side="left")
|
|
|
|
# Date du jour
|
|
self.filter_today_var = tk.BooleanVar(value=False)
|
|
tk.Checkbutton(ff, text="📅 Aujourd'hui uniquement",
|
|
variable=self.filter_today_var,
|
|
bg=BG, fg=self.YELLOW, selectcolor=BG2,
|
|
activebackground=BG,
|
|
font=("Consolas",10)).pack(side="left", padx=8)
|
|
|
|
# Filtre Intervenant — visible uniquement pour admin
|
|
self.filter_patcher_var = tk.StringVar(value="Tous")
|
|
if self.current_user["role"] == "admin":
|
|
tk.Label(ff, text="Intervenant :", bg=BG, fg=self.YELLOW,
|
|
font=("Consolas",10,"bold")).pack(side="left", padx=(8,2))
|
|
self.filter_patcher_cb = ttk.Combobox(ff, textvariable=self.filter_patcher_var,
|
|
state="readonly", width=14)
|
|
self.filter_patcher_cb.pack(side="left", padx=2)
|
|
self.filter_patcher_cb.bind("<<ComboboxSelected>>", lambda e: self._apply_filters())
|
|
else:
|
|
self.filter_patcher_cb = None # Non-admin : pas de filtre intervenant
|
|
|
|
for label, attr in [("Env", "filter_env"), ("Domaine", "filter_domain"),
|
|
("Application", "filter_app")]:
|
|
tk.Label(ff, text=f"{label}:", bg=BG, fg=FG,
|
|
font=("Consolas",10)).pack(side="left", padx=(8,2))
|
|
var = tk.StringVar(value="Tous")
|
|
setattr(self, f"{attr}_var", var)
|
|
cb = ttk.Combobox(ff, textvariable=var, state="readonly", width=14)
|
|
setattr(self, f"{attr}_cb", cb)
|
|
cb.pack(side="left", padx=2)
|
|
|
|
tk.Button(ff, text="Rechercher", bg=BTN, fg=ACCENT,
|
|
font=("Consolas",10,"bold"), pady=3,
|
|
command=self._apply_filters).pack(side="left", padx=6)
|
|
|
|
# ── Ligne 3 : Actions sélection ───────────────────────────────────────
|
|
af = tk.Frame(tab, bg=BG)
|
|
af.pack(fill="x", padx=10, pady=2)
|
|
|
|
for txt, cmd in [("✓ Tout (accordé)", self._select_all_ok),
|
|
("✗ Aucun", self._select_none),
|
|
("↕ Inverser", self._invert)]:
|
|
tk.Button(af, text=txt, bg=BTN, fg=FG, font=("Consolas",9),
|
|
command=cmd).pack(side="left", padx=3)
|
|
|
|
self.count_lbl = tk.Label(af, text="", bg=BG, fg=self.YELLOW,
|
|
font=("Consolas",9))
|
|
self.count_lbl.pack(side="right", padx=10)
|
|
|
|
# ── Treeview serveurs ─────────────────────────────────────────────────
|
|
tree_frame = tk.Frame(tab, bg=BG)
|
|
tree_frame.pack(fill="both", expand=True, padx=10, pady=4)
|
|
|
|
cols = ("sel","accord","serveur","intervenant","env","domaine","app","date_patch","snap","statut")
|
|
self.srv_tree = ttk.Treeview(tree_frame, columns=cols,
|
|
show="headings", height=22)
|
|
|
|
hdrs = {"sel":"✓","accord":"Accord","serveur":"Serveur",
|
|
"intervenant":"Intervenant","env":"Env",
|
|
"domaine":"Domaine","app":"Application","date_patch":"Date patch",
|
|
"snap":"Snapshot","statut":"Statut"}
|
|
widths = {"sel":30,"accord":70,"serveur":180,"intervenant":90,"env":80,"domaine":120,
|
|
"app":150,"date_patch":90,"snap":80,"statut":100}
|
|
|
|
for c in cols:
|
|
self.srv_tree.heading(c, text=hdrs[c])
|
|
self.srv_tree.column(c, width=widths[c], anchor="center" if c in ("sel","accord","snap","statut") else "w")
|
|
|
|
self.srv_tree.tag_configure("no_accord", foreground="#585b70", background="#1e1e2e")
|
|
self.srv_tree.tag_configure("ok", foreground=self.GREEN)
|
|
self.srv_tree.tag_configure("warn", foreground=self.YELLOW)
|
|
self.srv_tree.tag_configure("ko", foreground=self.RED)
|
|
self.srv_tree.tag_configure("selected", foreground=self.FG, background="#313244")
|
|
self.srv_tree.tag_configure("normal", foreground=self.FG)
|
|
self.srv_tree.tag_configure("already_patched",foreground="#1e1e2e", background="#40a02b")
|
|
|
|
vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.srv_tree.yview)
|
|
hsb = ttk.Scrollbar(tree_frame, orient="horizontal", command=self.srv_tree.xview)
|
|
self.srv_tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
|
|
self.srv_tree.pack(side="left", fill="both", expand=True)
|
|
vsb.pack(side="right", fill="y")
|
|
hsb.pack(side="bottom", fill="x")
|
|
self.srv_tree.bind("<ButtonRelease-1>", self._toggle_srv)
|
|
|
|
# ── Boutons prerequis + snapshot ──────────────────────────────────────
|
|
prereq_frame = tk.Frame(tab, bg=BG)
|
|
prereq_frame.pack(fill="x", padx=10, pady=4)
|
|
|
|
self.prereq_btn = tk.Button(prereq_frame, text="Verifier prerequis",
|
|
bg="#e64553", fg="white", font=("Consolas",12,"bold"),
|
|
pady=6, command=self._check_prerequisites)
|
|
self.prereq_btn.pack(side="left", padx=4)
|
|
|
|
self.prereq_status = tk.Label(prereq_frame, text="", bg=BG, fg=self.FG,
|
|
font=("Consolas",10))
|
|
self.prereq_status.pack(side="left", padx=12)
|
|
|
|
self.snap_btn = tk.Button(prereq_frame, text="Prendre snapshot vSphere",
|
|
bg="#313244", fg="#6c7086", font=("Consolas",10),
|
|
pady=4, state="disabled", command=self._take_snapshots)
|
|
self.snap_btn.pack(side="left", padx=4)
|
|
|
|
tk.Button(prereq_frame,
|
|
text="▶ Aller au Patch →",
|
|
bg="#40a02b", fg="white", font=("Consolas",11,"bold"),
|
|
pady=5,
|
|
command=lambda: self.nb.select(self.tab_patch)).pack(side="right", padx=4)
|
|
|
|
# ==========================================================================
|
|
# ONGLET 2 — PATCH
|
|
# ==========================================================================
|
|
def _build_tab_patch(self):
|
|
tab = self.tab_patch
|
|
BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
|
|
|
|
top = tk.Frame(tab, bg=BG)
|
|
top.pack(fill="both", expand=True, padx=10, pady=8)
|
|
|
|
# Colonne gauche : options (scrollable)
|
|
left_canvas = tk.Canvas(top, bg=BG, highlightthickness=0, width=380)
|
|
left_sb = ttk.Scrollbar(top, orient="vertical", command=left_canvas.yview)
|
|
left = tk.Frame(left_canvas, bg=BG)
|
|
left.bind("<Configure>", lambda e: left_canvas.configure(scrollregion=left_canvas.bbox("all")))
|
|
left_canvas.create_window((0, 0), window=left, anchor="nw")
|
|
left_canvas.configure(yscrollcommand=left_sb.set)
|
|
left_canvas.pack(side="left", fill="y")
|
|
left_sb.pack(side="left", fill="y")
|
|
left_canvas.bind_all("<MouseWheel>", lambda e: left_canvas.yview_scroll(int(-1*(e.delta/120)), "units"))
|
|
|
|
tk.Label(left, text="OPTIONS PATCH", bg=BTN, fg=ACCENT,
|
|
font=("Consolas",11,"bold"), padx=8, pady=4).pack(fill="x")
|
|
|
|
# Preset packages
|
|
tk.Label(left, text="Preset packages A PATCHER :", bg=BG, fg=FG,
|
|
font=("Consolas",10)).pack(anchor="w", pady=(8,2))
|
|
self.preset_var = tk.StringVar(value=list(PRESET_PACKAGES.keys())[0])
|
|
preset_cb = ttk.Combobox(left, textvariable=self.preset_var,
|
|
values=list(PRESET_PACKAGES.keys()),
|
|
state="readonly", width=30)
|
|
preset_cb.pack(anchor="w")
|
|
preset_cb.bind("<<ComboboxSelected>>", self._on_preset)
|
|
|
|
# Champ packages a patcher
|
|
tk.Label(left, text="Packages specifiques a PATCHER :", bg=BG, fg=FG,
|
|
font=("Consolas",10)).pack(anchor="w", pady=(8,2))
|
|
self.packages_var = tk.StringVar(value="")
|
|
self.packages_entry = tk.Entry(left, textvariable=self.packages_var, bg=BG2, fg=FG,
|
|
font=("Consolas",10), insertbackground=FG, width=32)
|
|
self.packages_entry.pack(anchor="w")
|
|
tk.Label(left, text="Ex: gnutls libsoup rhc",
|
|
bg=BG, fg="#6c7086", font=("Consolas",9)).pack(anchor="w")
|
|
|
|
# Champ packages a exclure
|
|
tk.Label(left, text="Packages specifiques a EXCLURE :", bg=BG, fg=FG,
|
|
font=("Consolas",10)).pack(anchor="w", pady=(8,2))
|
|
self.custom_excludes_var = tk.StringVar(value="")
|
|
self.custom_excludes_entry = tk.Entry(left, textvariable=self.custom_excludes_var, bg=BG2, fg=FG,
|
|
font=("Consolas",10), insertbackground=FG, width=32)
|
|
self.custom_excludes_entry.pack(anchor="w")
|
|
tk.Label(left, text="Ex: kernel* podman* docker*",
|
|
bg=BG, fg="#6c7086", font=("Consolas",9)).pack(anchor="w")
|
|
|
|
# Commande finale editable
|
|
tk.Label(left, text="Commande finale (editable) :", bg=BG, fg=self.YELLOW,
|
|
font=("Consolas",10,"bold")).pack(anchor="w", pady=(10,2))
|
|
self.final_cmd_var = tk.StringVar(value="")
|
|
self.final_cmd_entry = tk.Entry(left, textvariable=self.final_cmd_var,
|
|
bg="#181825", fg=self.YELLOW, font=("Consolas",9),
|
|
insertbackground=self.YELLOW, width=48)
|
|
self.final_cmd_entry.pack(anchor="w")
|
|
tk.Button(left, text="Generer la commande", bg=BTN, fg=FG,
|
|
font=("Consolas",9), command=self._preview_cmd).pack(anchor="w", pady=(4,0))
|
|
|
|
# Aide
|
|
help_frame = tk.Frame(left, bg="#181825", pady=4, padx=8)
|
|
help_frame.pack(fill="x", pady=(6,0))
|
|
tk.Label(help_frame, text="Preset = commande predeterminee",
|
|
bg="#181825", fg="#a6e3a1", font=("Consolas",8)).pack(anchor="w")
|
|
tk.Label(help_frame, text="Personnalise = packages a patcher ET/OU a exclure",
|
|
bg="#181825", fg="#f9e2af", font=("Consolas",8)).pack(anchor="w")
|
|
tk.Label(help_frame, text="La commande finale est toujours editable avant GO",
|
|
bg="#181825", fg="#6c7086", font=("Consolas",8)).pack(anchor="w")
|
|
|
|
# Options
|
|
self.exclude_kernel_var = tk.BooleanVar(value=True)
|
|
self.dryrun_var = tk.BooleanVar(value=True)
|
|
|
|
tk.Checkbutton(left, text="☑ Exclure kernel (recommandé)",
|
|
variable=self.exclude_kernel_var,
|
|
bg=BG, fg=self.YELLOW, selectcolor=BG2,
|
|
activebackground=BG,
|
|
font=("Consolas",10,"bold")).pack(anchor="w", pady=(10,2))
|
|
|
|
tk.Checkbutton(left, text="🔍 Dry Run (simulation)",
|
|
variable=self.dryrun_var,
|
|
bg=BG, fg=self.ORANGE, selectcolor=BG2,
|
|
activebackground=BG,
|
|
font=("Consolas",10,"bold")).pack(anchor="w", pady=2)
|
|
|
|
# Mot de passe CyberArk
|
|
tk.Label(left, text="Mot de passe CyberArk (prod) :",
|
|
bg=BG, fg=FG, font=("Consolas",10)).pack(anchor="w", pady=(12,2))
|
|
self.cybpwd_var = tk.StringVar()
|
|
tk.Entry(left, textvariable=self.cybpwd_var, show="●",
|
|
bg=BG2, fg=FG, font=("Consolas",10),
|
|
insertbackground=FG, width=28).pack(anchor="w")
|
|
|
|
# GO button
|
|
self.go_btn = tk.Button(left, text="🚀 GO PATCH",
|
|
bg="#d20f39", fg="white",
|
|
font=("Consolas",16,"bold"),
|
|
pady=12, padx=20,
|
|
command=self._go_patch)
|
|
self.go_btn.pack(fill="x", pady=16)
|
|
|
|
self.stop_btn = tk.Button(left, text="⏹ STOP",
|
|
bg="#6c7086", fg="white",
|
|
font=("Consolas",11,"bold"),
|
|
pady=6, state="disabled",
|
|
command=self._stop)
|
|
self.stop_btn.pack(fill="x")
|
|
|
|
# Colonne droite : log temps réel
|
|
right = tk.Frame(top, bg=BG)
|
|
right.pack(side="left", fill="both", expand=True)
|
|
|
|
tk.Label(right, text="DÉROULÉ EN TEMPS RÉEL",
|
|
bg=BTN, fg=ACCENT,
|
|
font=("Consolas",11,"bold"), pady=4).pack(fill="x")
|
|
|
|
self.patch_log = scrolledtext.ScrolledText(
|
|
right, height=28, bg="#11111b", fg=FG,
|
|
font=("Consolas",9), insertbackground=FG, state="disabled")
|
|
self.patch_log.pack(fill="both", expand=True, pady=4)
|
|
|
|
for tag, color in [("ok",self.GREEN),("ko",self.RED),
|
|
("warn",self.YELLOW),("info",ACCENT),
|
|
("cmd","#cba6f7")]:
|
|
self.patch_log.tag_configure(tag, foreground=color)
|
|
|
|
# Barre progression
|
|
pf = tk.Frame(tab, bg=BG)
|
|
pf.pack(fill="x", padx=10, pady=4)
|
|
self.prog_lbl = tk.Label(pf, text="En attente...",
|
|
bg=BG, fg=FG, font=("Consolas",9))
|
|
self.prog_lbl.pack(side="left", padx=4)
|
|
self.prog_var = tk.DoubleVar()
|
|
ttk.Progressbar(pf, variable=self.prog_var,
|
|
maximum=100, length=600).pack(side="left", fill="x", expand=True, padx=4)
|
|
|
|
# ==========================================================================
|
|
# ONGLET 3 — POST-PATCHING
|
|
# ==========================================================================
|
|
def _build_tab_post(self):
|
|
tab = self.tab_post
|
|
BG=self.BG; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
|
|
|
|
tk.Label(tab, text="POST-PATCHING", bg=BTN, fg=ACCENT,
|
|
font=("Consolas",12,"bold"), pady=6).pack(fill="x", padx=10, pady=8)
|
|
|
|
bf = tk.Frame(tab, bg=BG)
|
|
bf.pack(fill="x", padx=10, pady=4)
|
|
|
|
tk.Button(bf, text="🔍 Check post-patching",
|
|
bg="#1e66f5", fg="white", font=("Consolas",11,"bold"),
|
|
pady=6, command=self._post_patch_check).pack(side="left", padx=4)
|
|
|
|
tk.Button(bf, text="🗑 Supprimer anciens kernels",
|
|
bg="#e64553", fg="white", font=("Consolas",11,"bold"),
|
|
pady=6, command=self._remove_old_kernels).pack(side="left", padx=4)
|
|
|
|
tk.Button(bf, text="↩ Undo yum",
|
|
bg="#fe640b", fg="white", font=("Consolas",11,"bold"),
|
|
pady=6, command=self._undo_yum).pack(side="left", padx=4)
|
|
|
|
tk.Button(bf, text="🔄 Reboot serveurs sélectionnés",
|
|
bg="#8839ef", fg="white", font=("Consolas",11,"bold"),
|
|
pady=6, command=self._reboot_servers).pack(side="left", padx=4)
|
|
|
|
tk.Button(bf, text="🟩 Marquer patchés dans Excel",
|
|
bg="#40a02b", fg="white", font=("Consolas",11,"bold"),
|
|
pady=6, command=self._ask_mark_patched).pack(side="left", padx=4)
|
|
|
|
# Deuxième rangée de boutons
|
|
bf2 = tk.Frame(tab, bg=BG)
|
|
bf2.pack(fill="x", padx=10, pady=2)
|
|
|
|
tk.Button(bf2, text="🗑 Supprimer snapshots > 3 jours",
|
|
bg="#e64553", fg="white", font=("Consolas",11,"bold"),
|
|
pady=6, command=self._delete_old_snapshots).pack(side="left", padx=4)
|
|
|
|
self.post_log = scrolledtext.ScrolledText(
|
|
tab, bg="#11111b", fg=FG,
|
|
font=("Consolas",9), insertbackground=FG, state="disabled")
|
|
self.post_log.pack(fill="both", expand=True, padx=10, pady=4)
|
|
|
|
for tag, color in [("ok",self.GREEN),("ko",self.RED),
|
|
("warn",self.YELLOW),("info",ACCENT)]:
|
|
self.post_log.tag_configure(tag, foreground=color)
|
|
|
|
# ==========================================================================
|
|
# ONGLET 4 — RAPPORT PATCHING (style dashboard)
|
|
# ==========================================================================
|
|
def _build_tab_report(self):
|
|
tab = self.tab_report
|
|
BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
|
|
|
|
# ── Barre titre + boutons ─────────────────────────────────────────────
|
|
hf = tk.Frame(tab, bg=BTN, pady=6)
|
|
hf.pack(fill="x", padx=0, pady=0)
|
|
tk.Label(hf, text="RAPPORT DE PATCHING",
|
|
bg=BTN, fg=ACCENT, font=("Consolas",12,"bold")).pack(side="left", padx=12)
|
|
|
|
bf = tk.Frame(hf, bg=BTN)
|
|
bf.pack(side="right", padx=8)
|
|
tk.Button(bf, text="Generer", bg="#40a02b", fg="white",
|
|
font=("Consolas",10,"bold"), pady=3,
|
|
command=self._generate_report).pack(side="left", padx=4)
|
|
tk.Button(bf, text="CSV", bg=BTN, fg=FG,
|
|
font=("Consolas",10), pady=3,
|
|
command=self._export_csv).pack(side="left", padx=4)
|
|
tk.Button(bf, text="DokuWiki", bg=BTN, fg=FG,
|
|
font=("Consolas",10), pady=3,
|
|
command=self._export_dokuwiki).pack(side="left", padx=4)
|
|
|
|
# ── Widgets KPI (ligne de compteurs) ──────────────────────────────────
|
|
kpi_frame = tk.Frame(tab, bg=BG)
|
|
kpi_frame.pack(fill="x", padx=10, pady=8)
|
|
|
|
# Créer 6 widgets KPI
|
|
self.kpi_widgets = {}
|
|
kpi_defs = [
|
|
("total", "Total traités", "#313244", FG),
|
|
("ok", "Connexion OK", "#1a4a2e", self.GREEN),
|
|
("patched", "Patchés", "#1a4a2e", self.GREEN),
|
|
("uptodate", "Déjà à jour", "#2a2a3e", ACCENT),
|
|
("reboot", "Reboot requis", "#4a3000", self.YELLOW),
|
|
("ko", "KO / Auth", "#4a1a1a", self.RED),
|
|
]
|
|
for key, label, bg_col, fg_col in kpi_defs:
|
|
frame = tk.Frame(kpi_frame, bg=bg_col, padx=16, pady=10,
|
|
relief="flat", bd=0)
|
|
frame.pack(side="left", fill="x", expand=True, padx=4)
|
|
# Valeur
|
|
val_lbl = tk.Label(frame, text="—", bg=bg_col, fg=fg_col,
|
|
font=("Consolas",24,"bold"))
|
|
val_lbl.pack()
|
|
# Label
|
|
tk.Label(frame, text=label, bg=bg_col, fg=fg_col,
|
|
font=("Consolas",9)).pack()
|
|
self.kpi_widgets[key] = val_lbl
|
|
|
|
# ── Tableau détaillé résultats ─────────────────────────────────────────
|
|
tk.Label(tab, text="DETAIL PAR SERVEUR",
|
|
bg=BG, fg=ACCENT, font=("Consolas",10,"bold")).pack(anchor="w", padx=12, pady=(4,2))
|
|
|
|
tree_frame = tk.Frame(tab, bg=BG)
|
|
tree_frame.pack(fill="both", expand=True, padx=10, pady=4)
|
|
|
|
rcols = ("serveur","env","domaine","accord","snap","connexion","patch","reboot","detail")
|
|
self.report_tree = ttk.Treeview(tree_frame, columns=rcols,
|
|
show="headings", height=14)
|
|
rhdrs = {
|
|
"serveur": "Serveur",
|
|
"env": "Env",
|
|
"domaine": "Domaine",
|
|
"accord": "Accord",
|
|
"snap": "Snapshot",
|
|
"connexion": "Connexion",
|
|
"patch": "Patch",
|
|
"reboot": "Reboot",
|
|
"detail": "Detail packages",
|
|
}
|
|
rwidths = {
|
|
"serveur":200,"env":80,"domaine":100,"accord":70,
|
|
"snap":80,"connexion":90,"patch":100,"reboot":70,"detail":280
|
|
}
|
|
for c in rcols:
|
|
self.report_tree.heading(c, text=rhdrs[c])
|
|
self.report_tree.column(c, width=rwidths[c],
|
|
anchor="center" if c not in ("serveur","detail") else "w")
|
|
|
|
self.report_tree.tag_configure("ok", foreground=self.GREEN)
|
|
self.report_tree.tag_configure("ko", foreground=self.RED)
|
|
self.report_tree.tag_configure("warn", foreground=self.YELLOW)
|
|
self.report_tree.tag_configure("patched", foreground=self.GREEN, background="#1a2e1a")
|
|
self.report_tree.tag_configure("reboot", foreground=self.YELLOW, background="#2e2a00")
|
|
self.report_tree.tag_configure("uptodate",foreground=ACCENT)
|
|
|
|
vsb = ttk.Scrollbar(tree_frame, orient="vertical", command=self.report_tree.yview)
|
|
hsb = ttk.Scrollbar(tree_frame, orient="horizontal", command=self.report_tree.xview)
|
|
self.report_tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
|
|
self.report_tree.pack(side="left", fill="both", expand=True)
|
|
vsb.pack(side="right", fill="y")
|
|
hsb.pack(side="bottom", fill="x")
|
|
|
|
# ── Zone log texte (optionnel, compact) ───────────────────────────────
|
|
tk.Label(tab, text="LOG BRUT",
|
|
bg=BG, fg="#6c7086", font=("Consolas",9)).pack(anchor="w", padx=12, pady=(4,0))
|
|
self.report_txt = scrolledtext.ScrolledText(
|
|
tab, height=6, bg="#11111b", fg=FG,
|
|
font=("Consolas",8), insertbackground=FG, state="disabled")
|
|
self.report_txt.pack(fill="x", padx=10, pady=(0,6))
|
|
|
|
for tag, color in [("ok",self.GREEN),("ko",self.RED),
|
|
("warn",self.YELLOW),("info",ACCENT),
|
|
("title","#cba6f7")]:
|
|
self.report_txt.tag_configure(tag, foreground=color,
|
|
font=("Consolas",9,"bold") if tag=="title" else ("Consolas",8))
|
|
|
|
# ==========================================================================
|
|
# HELPERS LOG
|
|
# ==========================================================================
|
|
def _log(self, widget, msg, tag="info"):
|
|
def _do():
|
|
widget.configure(state="normal")
|
|
ts = datetime.now().strftime("%H:%M:%S")
|
|
widget.insert("end", f"[{ts}] {msg}\n", tag)
|
|
widget.see("end")
|
|
widget.configure(state="disabled")
|
|
self.root.after(0, _do)
|
|
|
|
def _log_patch(self, msg, tag="info"):
|
|
self._log(self.patch_log, msg, tag)
|
|
|
|
def _log_post(self, msg, tag="info"):
|
|
self._log(self.post_log, msg, tag)
|
|
|
|
# ==========================================================================
|
|
# PREREQUIS — SSH + Disque + Satellite + Snapshot
|
|
# ==========================================================================
|
|
def _check_prerequisites(self):
|
|
selected = [s for s in self.servers if s.get("selected") and s["accord"] == "oui"]
|
|
if not selected:
|
|
messagebox.showwarning("Prerequis", "Aucun serveur selectionne avec accord=OUI")
|
|
return
|
|
|
|
# Recharger les cles SSH
|
|
self._reload_keys()
|
|
|
|
# Verifier si des serveurs prod necessitent PSMP
|
|
has_prod = any("prod" in s.get("env", "").lower() for s in selected)
|
|
if has_prod and not self.cyb_pwd:
|
|
# Lire depuis le champ mot de passe CyberArk de l'onglet Patch
|
|
pwd = self.cybpwd_var.get().strip() if hasattr(self, 'cybpwd_var') else ""
|
|
if pwd:
|
|
self.cyb_pwd = pwd
|
|
else:
|
|
# Demander le mot de passe via dialogue centre sur le parent
|
|
dlg = tk.Toplevel(self.root)
|
|
dlg.title("CyberArk PSMP")
|
|
dlg.configure(bg="#1e1e2e")
|
|
dlg.resizable(False, False)
|
|
result_pwd = [None]
|
|
tk.Label(dlg, text=f"Mot de passe CyberArk ({self.settings.get('cybr_user', 'CYBP01336')}) :",
|
|
bg="#1e1e2e", fg="#cdd6f4", font=("Consolas", 11)).pack(padx=20, pady=(15, 5))
|
|
pwd_var = tk.StringVar()
|
|
pwd_entry = tk.Entry(dlg, textvariable=pwd_var, show="*", bg="#2a2a3e", fg="#cdd6f4",
|
|
font=("Consolas", 11), insertbackground="#cdd6f4", width=30)
|
|
pwd_entry.pack(padx=20, pady=5)
|
|
pwd_entry.focus_set()
|
|
def _ok(event=None):
|
|
result_pwd[0] = pwd_var.get().strip()
|
|
dlg.destroy()
|
|
pwd_entry.bind("<Return>", _ok)
|
|
tk.Button(dlg, text="OK", bg="#40a02b", fg="white", font=("Consolas", 10, "bold"),
|
|
padx=20, command=_ok).pack(pady=10)
|
|
center_window(dlg, 400, 140, parent=self.root)
|
|
dlg.grab_set()
|
|
dlg.wait_window()
|
|
pwd = result_pwd[0]
|
|
if not pwd:
|
|
messagebox.showwarning("Prerequis", "Mot de passe PSMP requis pour les serveurs Production")
|
|
return
|
|
self.cyb_pwd = pwd
|
|
if hasattr(self, 'cybpwd_var'):
|
|
self.cybpwd_var.set(pwd)
|
|
|
|
self.prereq_btn.configure(state="disabled", text="Verification en cours...")
|
|
self.prereq_status.configure(text="")
|
|
self.audit("PREREQ_START", f"{len(selected)} serveurs (prod={has_prod})")
|
|
|
|
self._log_patch(f"Prerequis : {len(selected)} serveurs a verifier...", "info")
|
|
|
|
def worker():
|
|
results = {}
|
|
total = len(selected)
|
|
for idx, s in enumerate(selected):
|
|
server = s["server"]
|
|
env = s.get("env", "")
|
|
r = {"ssh": False, "disk_root": None, "disk_log": None, "satellite": None, "is_vm": server.lower().startswith("v")}
|
|
|
|
self.root.after(0, lambda sv=server, i=idx: self.prereq_status.configure(
|
|
text=f"[{i+1}/{total}] {sv}"))
|
|
self._log_patch(f"\n[{idx+1}/{total}] {server} ({env})", "info")
|
|
|
|
# 1. Check SSH
|
|
self._log_patch(f" SSH...", "info")
|
|
is_prod = "prod" in env.lower()
|
|
method = "PSMP" if is_prod else "cle SSH"
|
|
client, eff = ssh_connect(server, env, self.settings,
|
|
self.pkey, self.pkey2, self.cyb_pwd)
|
|
if not client:
|
|
r["ssh"] = False
|
|
r["error"] = "Connexion SSH impossible"
|
|
self._log_patch(f" SSH ({method}) ECHEC", "ko")
|
|
results[server] = r
|
|
continue
|
|
|
|
r["ssh"] = True
|
|
self._log_patch(f" SSH ({method}) OK : {eff}", "ok")
|
|
|
|
# 2. Check disque /
|
|
self._log_patch(f" Disque / ...", "info")
|
|
out, _ = run_cmd(client, "df -BM / | awk 'NR==2{print $4}' | tr -d 'M'", 10)
|
|
try:
|
|
r["disk_root"] = int(out.strip())
|
|
except (ValueError, AttributeError):
|
|
r["disk_root"] = -1
|
|
if r["disk_root"] >= 1200:
|
|
self._log_patch(f" Disque / : {r['disk_root']} Mo libre (OK >= 1200)", "ok")
|
|
else:
|
|
self._log_patch(f" Disque / : {r['disk_root']} Mo libre (KO < 1200)", "ko")
|
|
|
|
# Check disque /var/log
|
|
self._log_patch(f" Disque /var/log ...", "info")
|
|
out2, _ = run_cmd(client, "df -BM /var/log | awk 'NR==2{print $4}' | tr -d 'M'", 10)
|
|
try:
|
|
r["disk_log"] = int(out2.strip())
|
|
except (ValueError, AttributeError):
|
|
r["disk_log"] = -1
|
|
if r["disk_log"] >= 500:
|
|
self._log_patch(f" Disque /var/log : {r['disk_log']} Mo libre (OK >= 500)", "ok")
|
|
else:
|
|
self._log_patch(f" Disque /var/log : {r['disk_log']} Mo libre (KO < 500)", "ko")
|
|
|
|
# 3. Check Satellite
|
|
self._log_patch(f" Satellite ...", "info")
|
|
sat_out, _ = run_cmd(client, "subscription-manager status 2>/dev/null | head -5", 15)
|
|
if sat_out and ("current" in sat_out.lower() or "valide" in sat_out.lower()):
|
|
r["satellite"] = True
|
|
self._log_patch(f" Satellite : OK (enregistre)", "ok")
|
|
elif sat_out and "unknown" in sat_out.lower():
|
|
r["satellite"] = False
|
|
self._log_patch(f" Satellite : NON enregistre", "ko")
|
|
else:
|
|
r["satellite"] = None
|
|
self._log_patch(f" Satellite : N/A (subscription-manager absent)", "warn")
|
|
|
|
client.close()
|
|
results[server] = r
|
|
|
|
# Afficher les resultats dans l'UI
|
|
self.root.after(0, lambda: self._show_prereq_results(results, selected))
|
|
|
|
threading.Thread(target=worker, daemon=True).start()
|
|
|
|
def _show_prereq_results(self, results, selected):
|
|
self.prereq_btn.configure(state="normal", text="Verifier prerequis")
|
|
self._log_patch(f"\n{'='*50}", "info")
|
|
self._log_patch(f"RESUME PREREQUIS", "info")
|
|
self._log_patch(f"{'='*50}", "info")
|
|
|
|
# Construire le rapport
|
|
ok_servers = []
|
|
ko_servers = []
|
|
warn_servers = []
|
|
|
|
for s in selected:
|
|
server = s["server"]
|
|
r = results.get(server, {})
|
|
|
|
if not r.get("ssh"):
|
|
ko_servers.append((server, "SSH connexion impossible"))
|
|
s["prereq_status"] = "SSH_KO"
|
|
elif r.get("disk_root", 0) >= 0 and r["disk_root"] < 1200:
|
|
ko_servers.append((server, f"/ : {r['disk_root']} Mo libre (min 1200 Mo)"))
|
|
s["prereq_status"] = "DISK_KO"
|
|
elif r.get("disk_log", 0) >= 0 and r["disk_log"] < 500:
|
|
ko_servers.append((server, f"/var/log : {r['disk_log']} Mo libre (min 500 Mo)"))
|
|
s["prereq_status"] = "DISK_KO"
|
|
elif r.get("satellite") is False:
|
|
ko_servers.append((server, "Satellite : non enregistre"))
|
|
s["prereq_status"] = "SAT_KO"
|
|
else:
|
|
ok_servers.append(server)
|
|
s["prereq_status"] = "OK"
|
|
if r.get("satellite") is None:
|
|
warn_servers.append((server, "subscription-manager absent (Debian/physique ?)"))
|
|
|
|
# Log resume
|
|
for srv in ok_servers:
|
|
self._log_patch(f" OK : {srv}", "ok")
|
|
for srv, reason in warn_servers:
|
|
self._log_patch(f" WARN : {srv} — {reason}", "warn")
|
|
for srv, reason in ko_servers:
|
|
self._log_patch(f" KO : {srv} — {reason}", "ko")
|
|
self._log_patch(f"\nTotal : {len(ok_servers)} OK, {len(warn_servers)} WARN, {len(ko_servers)} KO", "info")
|
|
|
|
# Dialogue resultat
|
|
msg = f"Prerequis verifies : {len(selected)} serveurs\n\n"
|
|
msg += f"OK : {len(ok_servers)}\n"
|
|
if warn_servers:
|
|
msg += f"Avertissements : {len(warn_servers)}\n"
|
|
if ko_servers:
|
|
msg += f"BLOQUANTS : {len(ko_servers)}\n\n"
|
|
msg += "Serveurs bloques :\n"
|
|
for srv, reason in ko_servers:
|
|
msg += f" {srv} : {reason}\n"
|
|
msg += "\nDeselectionnez les serveurs bloques pour continuer."
|
|
|
|
if ko_servers:
|
|
# Deselectionner automatiquement les KO
|
|
for srv, _ in ko_servers:
|
|
for s in self.servers:
|
|
if s["server"] == srv:
|
|
s["selected"] = False
|
|
self._apply_filters()
|
|
messagebox.showwarning("Prerequis — problemes detectes", msg)
|
|
self.prereq_status.configure(text=f"{len(ko_servers)} serveurs bloques", fg=self.RED)
|
|
else:
|
|
messagebox.showinfo("Prerequis OK", msg)
|
|
self.prereq_status.configure(text=f"{len(ok_servers)} serveurs OK", fg=self.GREEN)
|
|
# Activer le bouton snapshot pour les VM
|
|
vm_count = sum(1 for s in selected if s.get("prereq_status") == "OK"
|
|
and s["server"].lower().startswith("v"))
|
|
if vm_count > 0:
|
|
self.snap_btn.configure(state="normal", bg="#e64553", fg="white")
|
|
|
|
self.audit("PREREQ_DONE", f"OK={len(ok_servers)} KO={len(ko_servers)} WARN={len(warn_servers)}")
|
|
|
|
# ==========================================================================
|
|
# SNAPSHOT — force sans snap
|
|
# ==========================================================================
|
|
def _force_patch_without_snap(self, server_name):
|
|
"""Demande une justification pour patcher une VM sans snapshot"""
|
|
dlg = tk.Toplevel(self.root)
|
|
dlg.title("Patching sans snapshot")
|
|
dlg.configure(bg="#1e1e2e")
|
|
dlg.resizable(False, False)
|
|
result = [None]
|
|
|
|
tk.Label(dlg, text=f"ATTENTION — VM sans snapshot", bg="#181825", fg="#f38ba8",
|
|
font=("Consolas", 12, "bold"), pady=8).pack(fill="x")
|
|
tk.Label(dlg, text=f"Serveur : {server_name}", bg="#1e1e2e", fg="#cdd6f4",
|
|
font=("Consolas", 11)).pack(pady=(10, 5))
|
|
tk.Label(dlg, text="Justification obligatoire :", bg="#1e1e2e", fg="#f9e2af",
|
|
font=("Consolas", 10)).pack(anchor="w", padx=20)
|
|
justif_var = tk.StringVar()
|
|
tk.Entry(dlg, textvariable=justif_var, bg="#2a2a3e", fg="#cdd6f4",
|
|
font=("Consolas", 10), insertbackground="#cdd6f4", width=50).pack(padx=20, pady=5)
|
|
|
|
def accept():
|
|
j = justif_var.get().strip()
|
|
if len(j) < 5:
|
|
messagebox.showwarning("Justification", "Minimum 5 caracteres")
|
|
return
|
|
result[0] = j
|
|
dlg.destroy()
|
|
|
|
bf = tk.Frame(dlg, bg="#1e1e2e")
|
|
bf.pack(pady=10)
|
|
tk.Button(bf, text="Forcer le patching", bg="#d20f39", fg="white",
|
|
font=("Consolas", 10, "bold"), command=accept).pack(side="left", padx=8)
|
|
tk.Button(bf, text="Annuler", bg="#313244", fg="#cdd6f4",
|
|
font=("Consolas", 10), command=dlg.destroy).pack(side="left", padx=8)
|
|
|
|
center_window(dlg, 500, 220, parent=self.root)
|
|
dlg.grab_set()
|
|
dlg.wait_window()
|
|
|
|
if result[0]:
|
|
self.audit("FORCE_PATCH_NO_SNAP", f"{server_name} : {result[0]}")
|
|
return result[0]
|
|
|
|
# ==========================================================================
|
|
# A PROPOS
|
|
# ==========================================================================
|
|
def _show_about(self):
|
|
dlg = tk.Toplevel(self.root)
|
|
dlg.title("A propos")
|
|
dlg.configure(bg="#1e1e2e")
|
|
dlg.resizable(False, False)
|
|
|
|
tk.Frame(dlg, bg="#e94560", height=3).pack(fill="x")
|
|
tk.Label(dlg, text="SANEF Linux Patch Manager",
|
|
bg="#1e1e2e", fg="#89b4fa", font=("Consolas", 16, "bold")).pack(pady=(20, 4))
|
|
tk.Label(dlg, text=f"Version {VERSION}",
|
|
bg="#1e1e2e", fg="#cdd6f4", font=("Consolas", 12)).pack()
|
|
tk.Frame(dlg, bg="#313244", height=1).pack(fill="x", padx=30, pady=12)
|
|
tk.Label(dlg, text="SANEF DSI — Securite Operationnelle",
|
|
bg="#1e1e2e", fg="#cdd6f4", font=("Consolas", 11)).pack()
|
|
tk.Label(dlg, text="(KM)",
|
|
bg="#1e1e2e", fg="#a6e3a1", font=("Consolas", 11)).pack(pady=(4, 0))
|
|
tk.Label(dlg, text="SECOPS 2026",
|
|
bg="#1e1e2e", fg="#6c7086", font=("Consolas", 10)).pack(pady=(2, 0))
|
|
tk.Frame(dlg, bg="#313244", height=1).pack(fill="x", padx=30, pady=12)
|
|
tk.Label(dlg, text="Orchestration du patching Linux\nvia Excel + SSH + vSphere",
|
|
bg="#1e1e2e", fg="#6c7086", font=("Consolas", 9), justify="center").pack()
|
|
tk.Button(dlg, text="Fermer", bg="#313244", fg="#cdd6f4",
|
|
font=("Consolas", 10), padx=20, pady=4,
|
|
command=dlg.destroy).pack(pady=16)
|
|
|
|
center_window(dlg, 380, 320, parent=self.root)
|
|
dlg.grab_set()
|
|
|
|
# ==========================================================================
|
|
# SETTINGS
|
|
# ==========================================================================
|
|
def _open_settings(self):
|
|
dlg = SettingsDialog(self.root, self.settings)
|
|
if dlg.result:
|
|
self.settings = dlg.result
|
|
self._reload_keys()
|
|
|
|
def _reload_keys(self):
|
|
self.pkey = load_key(self.settings.get("keyfile",""))
|
|
self.pkey2 = load_key(self.settings.get("keyfile2",""))
|
|
|
|
# ==========================================================================
|
|
# ONGLET 1 — LOGIQUE
|
|
# ==========================================================================
|
|
def _browse_excel(self):
|
|
path = filedialog.askopenfilename(
|
|
filetypes=[("Excel","*.xlsx *.xls"),("All","*.*")])
|
|
if path:
|
|
self.excel_var.set(path)
|
|
self.settings["excel_file"] = path
|
|
save_settings(self.settings)
|
|
self._load_excel()
|
|
|
|
def _prev_week(self):
|
|
sheets = list(self.sheet_cb["values"])
|
|
if not sheets: return
|
|
cur = self.sheet_var.get()
|
|
idx = sheets.index(cur) if cur in sheets else 0
|
|
if idx > 0:
|
|
self.sheet_var.set(sheets[idx-1])
|
|
self._load_sheet()
|
|
|
|
def _next_week(self):
|
|
sheets = list(self.sheet_cb["values"])
|
|
if not sheets: return
|
|
cur = self.sheet_var.get()
|
|
idx = sheets.index(cur) if cur in sheets else 0
|
|
if idx < len(sheets)-1:
|
|
self.sheet_var.set(sheets[idx+1])
|
|
self._load_sheet()
|
|
|
|
def _goto_current_week(self):
|
|
week_num = date.today().isocalendar()[1]
|
|
sheets = list(self.sheet_cb["values"])
|
|
for s in [f"S{week_num}", f"S{week_num:02d}"]:
|
|
if s in sheets:
|
|
self.sheet_var.set(s)
|
|
self._load_sheet()
|
|
return
|
|
|
|
def _load_excel(self):
|
|
path = self.excel_var.get().strip()
|
|
if not path or not os.path.exists(path):
|
|
messagebox.showerror("Erreur", f"Fichier introuvable :\n{path}")
|
|
return
|
|
|
|
self.audit("LOAD_EXCEL", path)
|
|
try:
|
|
import io
|
|
try:
|
|
with open(path, "rb") as f:
|
|
data = io.BytesIO(f.read())
|
|
wb = openpyxl.load_workbook(data, read_only=True, data_only=True)
|
|
except Exception:
|
|
wb = openpyxl.load_workbook(path, read_only=True, data_only=True)
|
|
sheets = wb.sheetnames
|
|
wb.close()
|
|
|
|
# Mettre à jour le combobox feuilles
|
|
self.sheet_cb["values"] = sheets
|
|
week_sheet = None
|
|
week_num = date.today().isocalendar()[1]
|
|
for s in sheets:
|
|
if str(week_num) in s:
|
|
week_sheet = s
|
|
break
|
|
self.sheet_var.set(week_sheet or (sheets[0] if sheets else ""))
|
|
|
|
self._load_sheet()
|
|
except Exception as e:
|
|
messagebox.showerror("Erreur Excel", f"Impossible de modifier l'Excel :\n{e}")
|
|
|
|
def _load_sheet(self):
|
|
path = self.excel_var.get().strip()
|
|
sheet = self.sheet_var.get()
|
|
if not path or not sheet:
|
|
return
|
|
|
|
servers, _ = read_excel_servers(path, sheet)
|
|
if servers is None:
|
|
messagebox.showerror("Erreur", "Aucun serveur Linux trouvé")
|
|
return
|
|
|
|
# Détecter si c'est la semaine courante
|
|
week_num = date.today().isocalendar()[1]
|
|
self.is_current_week = any(str(week_num) in sheet for sheet in [sheet])
|
|
|
|
# Mettre à jour l'indicateur visuel
|
|
if self.is_current_week:
|
|
self.week_mode_lbl.configure(
|
|
text="✅ SEMAINE COURANTE — Patch autorisé",
|
|
fg=self.GREEN)
|
|
else:
|
|
self.week_mode_lbl.configure(
|
|
text=f"👁 VUE SEULE ({sheet}) — Patch désactivé",
|
|
fg=self.YELLOW)
|
|
|
|
self.servers = servers
|
|
|
|
# Peupler les filtres
|
|
envs = sorted(set(s["env"] for s in servers if s["env"]))
|
|
domains = sorted(set(s["domain"] for s in servers if s["domain"]))
|
|
apps = sorted(set(s["app"] for s in servers if s["app"]))
|
|
patchers = sorted(set(s["patcher"].strip() for s in servers if s.get("patcher") and s["patcher"].strip()))
|
|
|
|
self.filter_env_cb["values"] = ["Tous"] + envs
|
|
self.filter_domain_cb["values"] = ["Tous"] + domains
|
|
self.filter_app_cb["values"] = ["Tous"] + apps
|
|
if self.filter_patcher_cb:
|
|
self.filter_patcher_cb["values"] = ["Tous"] + patchers
|
|
self.filter_env_var.set("Tous")
|
|
self.filter_domain_var.set("Tous")
|
|
self.filter_app_var.set("Tous")
|
|
|
|
# Auto-selectionner le patcheur
|
|
if self.current_user["role"] == "admin":
|
|
patcher_setting = self.settings.get("patcher", "").strip()
|
|
if patcher_setting and patcher_setting in patchers:
|
|
self.filter_patcher_var.set(patcher_setting)
|
|
else:
|
|
self.filter_patcher_var.set("Tous")
|
|
else:
|
|
self.filter_patcher_var.set("Tous") # Non-admin filtre par _apply_filters
|
|
|
|
self._apply_filters()
|
|
# Griser/activer le bouton GO selon la semaine
|
|
if hasattr(self, "go_btn"):
|
|
if self.is_current_week:
|
|
self.go_btn.configure(bg="#d20f39", state="normal",
|
|
text="🚀 GO PATCH")
|
|
else:
|
|
self.go_btn.configure(bg="#45475a", state="normal",
|
|
text="👁 VUE SEULE — Patch désactivé")
|
|
|
|
# Vérifier snapshots si des serveurs ont accord=oui
|
|
accord_servers = [s for s in self.servers
|
|
if s.get("accord") == "oui" and not s.get("is_physical")]
|
|
if accord_servers and VSPHERE_OK:
|
|
self.root.after(200, self._prompt_vcenter_and_check_snaps)
|
|
|
|
def _apply_filters(self):
|
|
env_f = self.filter_env_var.get()
|
|
domain_f = self.filter_domain_var.get()
|
|
app_f = self.filter_app_var.get()
|
|
patcher_f = self.filter_patcher_var.get()
|
|
today_f = self.filter_today_var.get()
|
|
today = date.today()
|
|
|
|
# Non-admin : ne voit que ses serveurs (intervenant = son username)
|
|
is_admin = self.current_user["role"] == "admin"
|
|
my_name = self.current_user["username"].lower()
|
|
|
|
filtered = []
|
|
for s in self.servers:
|
|
# Filtre par utilisateur connecte (non-admin)
|
|
if not is_admin:
|
|
srv_patcher = (s.get("patcher", "") or "").strip().lower()
|
|
if srv_patcher != my_name:
|
|
continue
|
|
|
|
if patcher_f != "Tous":
|
|
srv_patcher = (s.get("patcher", "") or "").strip()
|
|
patcher_cmp = patcher_f.strip()
|
|
if srv_patcher and srv_patcher.lower() != patcher_cmp.lower():
|
|
continue
|
|
if env_f != "Tous" and s["env"] != env_f: continue
|
|
if domain_f != "Tous" and s["domain"] != domain_f: continue
|
|
if app_f != "Tous" and s["app"] != app_f: continue
|
|
if today_f and s["date_patch"] and s["date_patch"] != today: continue
|
|
filtered.append(s)
|
|
|
|
self._populate_tree(filtered)
|
|
|
|
def _populate_tree(self, servers):
|
|
for item in self.srv_tree.get_children():
|
|
self.srv_tree.delete(item)
|
|
|
|
# Compteur pour gérer les doublons de nom de serveur dans la feuille
|
|
seen = {}
|
|
|
|
for s in servers:
|
|
accord_ok = s["accord"] == "oui"
|
|
sel_mark = "✓" if s.get("selected") and accord_ok else ("✗" if accord_ok else "—")
|
|
accord_lbl= "✅ OUI" if accord_ok else "❌ NON"
|
|
snap_lbl = "✅ OK" if s.get("snap_done") else ("🖥 PHY" if s.get("is_physical") else "—")
|
|
date_str = s["date_patch"].strftime("%d/%m/%Y") if s.get("date_patch") else "—"
|
|
|
|
if s.get("already_patched"):
|
|
tag = "already_patched"
|
|
elif not accord_ok:
|
|
tag = "no_accord"
|
|
elif s.get("selected"):
|
|
tag = "selected"
|
|
else:
|
|
tag = "normal"
|
|
|
|
# Générer un iid unique si le nom de serveur apparaît en double
|
|
base_iid = s["server"]
|
|
seen[base_iid] = seen.get(base_iid, 0) + 1
|
|
iid = base_iid if seen[base_iid] == 1 else f"{base_iid}_#{seen[base_iid]}"
|
|
s["_iid"] = iid # stocker pour _refresh_tree_item
|
|
|
|
self.srv_tree.insert("", "end",
|
|
iid=iid,
|
|
values=(sel_mark, accord_lbl, s["server"],
|
|
s.get("patcher",""), s["env"],
|
|
s["domain"], s["app"], date_str, snap_lbl, s["status"]),
|
|
tags=(tag,))
|
|
|
|
n = sum(1 for s in servers if s.get("selected") and s["accord"]=="oui")
|
|
self.count_lbl.configure(text=f"{len(servers)} serveurs affichés — {n} sélectionnés")
|
|
|
|
def _get_server(self, name):
|
|
for s in self.servers:
|
|
if s["server"] == name:
|
|
return s
|
|
return None
|
|
|
|
def _get_server_by_iid(self, iid):
|
|
"""Retrouver un serveur par son iid treeview (gère les doublons de noms)."""
|
|
for s in self.servers:
|
|
if s.get("_iid") == iid:
|
|
return s
|
|
# Fallback : chercher par nom (compatibilité)
|
|
return self._get_server(iid)
|
|
|
|
def _toggle_srv(self, event):
|
|
region = self.srv_tree.identify_region(event.x, event.y)
|
|
if region != "cell": return
|
|
item = self.srv_tree.identify_row(event.y)
|
|
if not item: return
|
|
# Retrouver le serveur par son iid stocké (gère les doublons)
|
|
s = self._get_server_by_iid(item)
|
|
if not s or s["accord"] != "oui": return # accord obligatoire
|
|
s["selected"] = not s.get("selected", False)
|
|
self._refresh_tree_item(s)
|
|
n = sum(1 for x in self.servers if x.get("selected") and x["accord"]=="oui")
|
|
self.count_lbl.configure(text=f"— {n} sélectionnés")
|
|
|
|
def _refresh_tree_item(self, s):
|
|
# Chercher l'iid : d'abord _iid stocké, sinon nom serveur, sinon chercher dans l'arbre
|
|
iid = s.get("_iid", s["server"])
|
|
if not self.srv_tree.exists(iid):
|
|
# Fallback : chercher parmi tous les items de l'arbre par valeur "server"
|
|
for item in self.srv_tree.get_children():
|
|
vals = self.srv_tree.item(item, "values")
|
|
if vals and len(vals) > 2 and vals[2] == s["server"]:
|
|
iid = item
|
|
s["_iid"] = iid # mémoriser pour la prochaine fois
|
|
break
|
|
else:
|
|
return # serveur pas dans l'arbre (filtré)
|
|
accord_ok = s["accord"] == "oui"
|
|
sel_mark = "✓" if s.get("selected") else "✗"
|
|
snap_lbl = "✅ OK" if s.get("snap_done") else ("🖥 PHY" if s.get("is_physical") else "—")
|
|
date_str = s["date_patch"].strftime("%d/%m/%Y") if s.get("date_patch") else "—"
|
|
tag = "selected" if s.get("selected") else "normal"
|
|
self.srv_tree.item(iid, values=(
|
|
sel_mark, "✅ OUI" if accord_ok else "❌ NON",
|
|
s["server"], s.get("patcher",""), s["env"], s["domain"], s["app"],
|
|
date_str, snap_lbl, s["status"]), tags=(tag,))
|
|
|
|
def _select_all_ok(self):
|
|
for s in self.servers:
|
|
if s["accord"] == "oui" and self.srv_tree.exists(s["server"]):
|
|
s["selected"] = True
|
|
self._refresh_tree_item(s)
|
|
n = sum(1 for s in self.servers if s.get("selected"))
|
|
self.count_lbl.configure(text=f"— {n} sélectionnés")
|
|
|
|
def _select_none(self):
|
|
for s in self.servers:
|
|
s["selected"] = False
|
|
if self.srv_tree.exists(s["server"]):
|
|
self._refresh_tree_item(s)
|
|
self.count_lbl.configure(text="— 0 sélectionnés")
|
|
|
|
def _invert(self):
|
|
for s in self.servers:
|
|
if s["accord"] == "oui" and self.srv_tree.exists(s["server"]):
|
|
s["selected"] = not s.get("selected", False)
|
|
self._refresh_tree_item(s)
|
|
n = sum(1 for s in self.servers if s.get("selected"))
|
|
self.count_lbl.configure(text=f"— {n} sélectionnés")
|
|
|
|
# ==========================================================================
|
|
# SNAPSHOT VSPHERE
|
|
# ==========================================================================
|
|
def _prompt_vcenter_and_check_snaps(self):
|
|
"""Prompt vCenter puis vérifie les snapshots des serveurs sélectionnés avec accord."""
|
|
accord_servers = [s for s in self.servers
|
|
if s.get("selected") and s.get("accord") == "oui" and not s.get("is_physical")]
|
|
if not accord_servers:
|
|
return
|
|
|
|
# Si identifiants déjà en settings, utiliser directement
|
|
vs_user = self.settings.get("vs_user", "").strip()
|
|
vs_pwd = self.settings.get("_vs_pwd_session", "").strip()
|
|
|
|
if vs_user and vs_pwd:
|
|
self._log_patch(
|
|
f"Vérification snapshots ({len(accord_servers)} serveurs avec accord)...", "info")
|
|
threading.Thread(target=self._check_snap_worker,
|
|
args=(accord_servers, vs_user, vs_pwd), daemon=True).start()
|
|
return
|
|
|
|
# Sinon : prompt identifiants
|
|
dlg = tk.Toplevel(self.root)
|
|
dlg.title("vSphere — Vérification snapshots")
|
|
dlg.configure(bg=self.BG)
|
|
dlg.grab_set()
|
|
dlg.resizable(False, False)
|
|
self._center_dialog(dlg, 400, 230)
|
|
|
|
tk.Label(dlg, text="Connexion vCenter", bg=self.BG, fg=self.ACCENT,
|
|
font=("Consolas",12,"bold")).pack(pady=10)
|
|
msg_lbl = f"Vérification snapshots pour {len(accord_servers)} serveur(s) avec accord."
|
|
tk.Label(dlg, text=msg_lbl, bg=self.BG, fg=self.FG,
|
|
font=("Consolas",9), justify="center").pack(pady=(0,8))
|
|
|
|
for lbl, attr, show in [
|
|
("Utilisateur :", "dlg_vs_user", ""),
|
|
("Mot de passe :", "dlg_vs_pwd", "●")
|
|
]:
|
|
f = tk.Frame(dlg, bg=self.BG); f.pack(fill="x", padx=24, pady=4)
|
|
tk.Label(f, text=lbl, bg=self.BG, fg=self.FG,
|
|
font=("Consolas",10), width=14).pack(side="left")
|
|
var = tk.StringVar(value=self.settings.get("vs_user", "") if "Util" in lbl else "")
|
|
setattr(self, attr, var)
|
|
tk.Entry(f, textvariable=var, show=show, bg=self.BG2, fg=self.FG,
|
|
font=("Consolas",10), insertbackground=self.FG,
|
|
width=26).pack(side="left", padx=4)
|
|
|
|
def do_check():
|
|
user = self.dlg_vs_user.get().strip()
|
|
pwd = self.dlg_vs_pwd.get().strip()
|
|
if not user or not pwd:
|
|
messagebox.showwarning("Attention", "Identifiants requis.", parent=dlg)
|
|
return
|
|
# Sauvegarder en settings pour ne pas re-demander
|
|
self.settings["vs_user"] = user
|
|
self.settings["_vs_pwd_session"] = pwd
|
|
save_settings(self.settings)
|
|
dlg.destroy()
|
|
self._log_patch(
|
|
f"Vérification snapshots ({len(accord_servers)} serveurs avec accord)...", "info")
|
|
threading.Thread(target=self._check_snap_worker,
|
|
args=(accord_servers, user, pwd), daemon=True).start()
|
|
|
|
def skip():
|
|
dlg.destroy()
|
|
self._log_patch("Vérification snapshot ignorée.", "warn")
|
|
|
|
bf = tk.Frame(dlg, bg=self.BG); bf.pack(pady=10)
|
|
tk.Button(bf, text="🔍 Vérifier", bg=self.ACCENT, fg="white",
|
|
font=("Consolas",11,"bold"), padx=10, pady=4,
|
|
command=do_check).pack(side="left", padx=6)
|
|
tk.Button(bf, text="Ignorer", bg=self.BG2, fg=self.FG,
|
|
font=("Consolas",10), padx=10, pady=4,
|
|
command=skip).pack(side="left", padx=6)
|
|
|
|
dlg.bind("<Return>", lambda e: do_check())
|
|
|
|
def _check_snap_worker(self, servers, vs_user, vs_pwd):
|
|
"""Thread : vérifie l'existence d'un snapshot PrePatch sur les vCenters."""
|
|
snap_prefix = "PrePatch_"
|
|
|
|
def has_prepatch_snap(vm):
|
|
"""Retourne True si la VM a un snapshot dont le nom commence par PrePatch_"""
|
|
if not vm.snapshot or not vm.snapshot.rootSnapshotList:
|
|
return False
|
|
def walk(snap_list):
|
|
for snap in snap_list:
|
|
if snap.name.startswith(snap_prefix):
|
|
return True
|
|
if snap.childSnapshotList and walk(snap.childSnapshotList):
|
|
return True
|
|
return False
|
|
return walk(vm.snapshot.rootSnapshotList)
|
|
|
|
for vc in VSPHERE_HOSTS:
|
|
# Serveurs pas encore trouvés
|
|
pending = [s for s in servers if not s.get("snap_done")]
|
|
if not pending:
|
|
break
|
|
try:
|
|
import ssl
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
si = SmartConnect(host=vc, user=vs_user, pwd=vs_pwd, sslContext=ctx)
|
|
vc_content = si.RetrieveContent()
|
|
|
|
for s in pending:
|
|
vm = self._find_vm(vc_content, s["server"])
|
|
if not vm:
|
|
continue
|
|
if has_prepatch_snap(vm):
|
|
s["snap_done"] = True
|
|
self._log_patch(f" ✅ Snapshot existant : {s['server']}", "ok")
|
|
self.root.after(0, lambda sv=s: self._refresh_tree_item(sv))
|
|
else:
|
|
self._log_patch(f" ⚠ Pas de snapshot : {s['server']}", "warn")
|
|
|
|
Disconnect(si)
|
|
except Exception as e:
|
|
self._log_patch(f" Erreur vCenter {vc} : {e}", "ko")
|
|
|
|
# Bilan
|
|
ok = sum(1 for s in servers if s.get("snap_done"))
|
|
ko = len(servers) - ok
|
|
self._log_patch(
|
|
f"Snapshots : {ok} trouvé(s), {ko} manquant(s) sur {len(servers)} serveur(s) avec accord",
|
|
"ok" if ko == 0 else "warn")
|
|
|
|
def _center_dialog(self, dlg, w, h):
|
|
"""Centre un dialog sur le meme ecran que la fenetre principale."""
|
|
center_window(dlg, w, h, parent=self.root)
|
|
|
|
def _take_snapshots(self):
|
|
selected = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
|
|
if not selected:
|
|
messagebox.showwarning("Attention", "Aucun serveur sélectionné (avec accord).")
|
|
return
|
|
|
|
if not VSPHERE_OK:
|
|
messagebox.showwarning("pyVmomi manquant",
|
|
"pyVmomi non installé.\n\n"
|
|
"Installer avec :\npy -m pip install pyVmomi --proxy http://proxy.sanef.fr:8080\n\n"
|
|
"Après installation, relancer l'application.")
|
|
return
|
|
|
|
# Prompt vSphere
|
|
dlg = tk.Toplevel(self.root)
|
|
dlg.title("Identifiants vSphere")
|
|
dlg.configure(bg=self.BG)
|
|
dlg.grab_set()
|
|
dlg.resizable(False, False)
|
|
|
|
tk.Label(dlg, text="Connexion vSphere", bg=self.BG, fg=self.ACCENT,
|
|
font=("Consolas",12,"bold")).pack(pady=10)
|
|
for lbl, attr, show, default_key in [
|
|
("Utilisateur :", "vs_user", "", "vs_user"),
|
|
("Mot de passe :", "vs_pwd", "\u25cf", "")
|
|
]:
|
|
f = tk.Frame(dlg, bg=self.BG); f.pack(fill="x", padx=20, pady=4)
|
|
tk.Label(f, text=lbl, bg=self.BG, fg=self.FG,
|
|
font=("Consolas",10), width=14).pack(side="left")
|
|
var = tk.StringVar(value=self.settings.get(default_key, ""))
|
|
setattr(self, attr, var)
|
|
tk.Entry(f, textvariable=var, show=show, bg=self.BG2, fg=self.FG,
|
|
font=("Consolas",10), insertbackground=self.FG).pack(side="left", fill="x", expand=True)
|
|
center_window(dlg, 400, 200, parent=self.root)
|
|
|
|
def do_snap():
|
|
user = self.vs_user.get().strip()
|
|
pwd = self.vs_pwd.get().strip()
|
|
dlg.destroy()
|
|
threading.Thread(target=self._snap_worker,
|
|
args=(selected, user, pwd), daemon=True).start()
|
|
|
|
tk.Button(dlg, text="📸 Prendre snapshots", bg="#e64553", fg="white",
|
|
font=("Consolas",11,"bold"), pady=6,
|
|
command=do_snap).pack(pady=10)
|
|
|
|
def _snap_worker(self, servers, vs_user, vs_pwd):
|
|
snap_name = f"PrePatch_{date.today():%Y%m%d}"
|
|
# Travailler sur les serveurs pas encore snapshotés
|
|
pending = [s for s in servers if not s.get("snap_done")]
|
|
|
|
for vc in VSPHERE_HOSTS:
|
|
# Si tous les serveurs sont déjà snapshotés, inutile de continuer
|
|
pending = [s for s in pending if not s.get("snap_done")]
|
|
if not pending:
|
|
break
|
|
|
|
self._log_patch(f"Connexion vCenter : {vc}", "info")
|
|
try:
|
|
import ssl
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
si = SmartConnect(host=vc, user=vs_user, pwd=vs_pwd,
|
|
sslContext=ctx)
|
|
content = si.RetrieveContent()
|
|
|
|
for s in pending:
|
|
if s.get("snap_done"):
|
|
continue # déjà fait sur un vCenter précédent
|
|
vm = self._find_vm(content, s["server"])
|
|
if vm:
|
|
try:
|
|
task = vm.CreateSnapshot_Task(
|
|
name=snap_name,
|
|
description=f"Pre-patch automatique {date.today()}",
|
|
memory=False, quiesce=False)
|
|
while task.info.state not in ("success","error"):
|
|
time.sleep(1)
|
|
if task.info.state == "success":
|
|
s["snap_done"] = True
|
|
self._log_patch(f" ✅ Snapshot OK : {s['server']}", "ok")
|
|
# Rafraîchir l'UI immédiatement (ne pas attendre la fin)
|
|
self.root.after(0, lambda sv=s: self._refresh_tree_item(sv))
|
|
else:
|
|
self._log_patch(f" ❌ Snapshot KO : {s['server']} ({task.info.error})", "ko")
|
|
except Exception as e:
|
|
self._log_patch(f" ❌ {s['server']} : {e}", "ko")
|
|
# Ne pas logguer "VM non trouvée" ici — la VM peut être sur un autre vCenter
|
|
|
|
Disconnect(si)
|
|
except Exception as e:
|
|
self._log_patch(f" Erreur vCenter {vc} : {e}", "ko")
|
|
|
|
# Bilan final
|
|
ok = [s for s in servers if s.get("snap_done")]
|
|
ko = [s for s in servers if not s.get("snap_done") and not s.get("is_physical")]
|
|
for s in ko:
|
|
self._log_patch(f" ⚠ VM non trouvée sur aucun vCenter : {s['server']}", "warn")
|
|
self._log_patch(
|
|
f"Snapshots terminés : {len(ok)}/{len(servers)} OK", "ok" if not ko else "warn")
|
|
# Rafraîchir l'arbre complet à la fin
|
|
self.root.after(0, lambda: [self._refresh_tree_item(s) for s in servers])
|
|
|
|
def _find_vm(self, content, name):
|
|
"""Cherche une VM par nom exact ou partiel dans tout le vCenter"""
|
|
name_short = name.split(".")[0].lower() # Sans le domaine
|
|
|
|
def search_folder(folder):
|
|
try:
|
|
for obj in folder.childEntity:
|
|
if hasattr(obj, "childEntity"):
|
|
result = search_folder(obj)
|
|
if result:
|
|
return result
|
|
if hasattr(obj, "name"):
|
|
vm_name = obj.name.lower()
|
|
if vm_name == name.lower() or vm_name == name_short:
|
|
return obj
|
|
if name_short in vm_name or vm_name in name_short:
|
|
return obj
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
try:
|
|
for dc in content.rootFolder.childEntity:
|
|
result = search_folder(dc.vmFolder)
|
|
if result:
|
|
return result
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
# ==========================================================================
|
|
# ONGLET 2 — LOGIQUE PATCH
|
|
# ==========================================================================
|
|
def _on_preset(self, event=None):
|
|
val = PRESET_PACKAGES.get(self.preset_var.get(), "")
|
|
if val == "CUSTOM":
|
|
# Mode personnalise — activer les champs
|
|
self.packages_entry.configure(state="normal")
|
|
self.custom_excludes_entry.configure(state="normal")
|
|
self.packages_var.set("")
|
|
self.custom_excludes_var.set("")
|
|
else:
|
|
self.packages_var.set(val)
|
|
self.custom_excludes_var.set("")
|
|
self._preview_cmd()
|
|
|
|
def _preview_cmd(self):
|
|
"""Genere la commande yum et l'affiche dans le champ editable"""
|
|
preset = self.preset_var.get()
|
|
packages = self.packages_var.get().strip()
|
|
custom_excl = self.custom_excludes_var.get().strip()
|
|
exclude_kernel = self.exclude_kernel_var.get()
|
|
dryrun = self.dryrun_var.get()
|
|
|
|
if preset == "Personnalise..." or custom_excl:
|
|
# Mode personnalise
|
|
if packages and not custom_excl:
|
|
# Packages specifiques a patcher uniquement
|
|
if dryrun:
|
|
cmd = f"sudo yum check-update {packages} 2>/dev/null | head -30"
|
|
else:
|
|
cmd = f"sudo yum update -y {packages} 2>&1 | tail -25"
|
|
elif custom_excl:
|
|
# Excludes personnalises
|
|
excl_parts = " ".join(f"--exclude={e.strip()}" for e in custom_excl.split() if e.strip())
|
|
if exclude_kernel and "--exclude=kernel" not in excl_parts.lower():
|
|
excl_parts += " --exclude=*kernel*"
|
|
if packages:
|
|
if dryrun:
|
|
cmd = f"sudo yum check-update {packages} {excl_parts} 2>/dev/null | head -30"
|
|
else:
|
|
cmd = f"sudo yum update -y {packages} {excl_parts} 2>&1 | tail -25"
|
|
else:
|
|
if dryrun:
|
|
cmd = f"sudo yum check-update {excl_parts} 2>/dev/null | head -30"
|
|
else:
|
|
cmd = f"sudo yum update -y {excl_parts} 2>&1 | tail -25"
|
|
else:
|
|
cmd = "sudo yum check-update 2>/dev/null | head -30" if dryrun else "sudo yum update -y 2>&1 | tail -25"
|
|
else:
|
|
# Mode preset standard — utilise build_yum_command
|
|
cmd = build_yum_command("standard", packages, exclude_kernel, dryrun)
|
|
cmd = "sudo " + cmd if not cmd.startswith("sudo") else cmd
|
|
|
|
self.final_cmd_var.set(cmd)
|
|
|
|
def _go_patch(self):
|
|
# Bloquer si pas la semaine courante
|
|
if not self.is_current_week:
|
|
messagebox.showwarning(
|
|
"Patch desactive",
|
|
"Vous consultez une autre semaine que la semaine courante.\n\n"
|
|
"Le patch n'est autorise que sur la semaine en cours.\n"
|
|
"Utilisez le bouton ↺ pour revenir a la semaine courante.")
|
|
return
|
|
|
|
selected = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
|
|
if not selected:
|
|
messagebox.showwarning("Attention", "Aucun serveur sélectionné avec accord.")
|
|
return
|
|
|
|
# Vérifier snapshot pour les VMs
|
|
no_snap = [s for s in selected if not s.get("snap_done") and not s.get("is_physical")]
|
|
if no_snap:
|
|
names = "\n".join(s["server"] for s in no_snap[:8])
|
|
ans = messagebox.askyesno(
|
|
"⚠️ Snapshot manquant",
|
|
f"{len(no_snap)} serveur(s) sans snapshot :\n{names}\n\n"
|
|
"Continuer SANS snapshot ?",
|
|
icon="warning")
|
|
if not ans:
|
|
return
|
|
|
|
if not self.dryrun_var.get():
|
|
if not messagebox.askyesno(
|
|
"Confirmation PATCH RÉEL",
|
|
f"Lancer le PATCH RÉEL sur {len(selected)} serveur(s) ?\n\n"
|
|
"Cette action est irréversible !",
|
|
icon="warning"):
|
|
return
|
|
|
|
self.cyb_pwd = self.cybpwd_var.get().strip() or None
|
|
self.running = True
|
|
self.apply_all_cmd = False
|
|
self.results = []
|
|
self.go_btn.configure(state="disabled")
|
|
self.stop_btn.configure(state="normal")
|
|
|
|
threading.Thread(
|
|
target=self._patch_worker,
|
|
args=(selected,),
|
|
daemon=True).start()
|
|
|
|
def _stop(self):
|
|
self.running = False
|
|
self._log_patch("⏹ Arrêt demandé...", "warn")
|
|
|
|
def _patch_worker(self, servers):
|
|
total = len(servers)
|
|
packages = self.packages_var.get().strip()
|
|
exclude_kernel = self.exclude_kernel_var.get()
|
|
dryrun = self.dryrun_var.get()
|
|
approved_cmd = None # commande approuvée pour "appliquer à tous"
|
|
|
|
for idx, s in enumerate(servers):
|
|
if not self.running:
|
|
break
|
|
|
|
server = s["server"]
|
|
self._log_patch(f"\n[{idx+1}/{total}] ── {server} ──", "info")
|
|
|
|
# Construire la commande yum
|
|
yum_cmd = build_yum_command(s["domain"], packages, exclude_kernel, dryrun)
|
|
|
|
# Confirmation commande (sauf si "appliquer à tous" déjà validé)
|
|
if not self.apply_all_cmd:
|
|
event = threading.Event()
|
|
dialog_result = [None]
|
|
|
|
def show_dialog(cmd=yum_cmd, srv=server, ev=event, res=dialog_result):
|
|
dlg = YumConfirmDialog(self.root, srv, cmd)
|
|
res[0] = dlg.result
|
|
if dlg.result in ("apply","apply_all"):
|
|
yum_cmd_final = dlg.final_cmd
|
|
res.append(yum_cmd_final)
|
|
ev.set()
|
|
|
|
self.root.after(0, show_dialog)
|
|
event.wait()
|
|
|
|
action = dialog_result[0]
|
|
if action == "cancel":
|
|
self._log_patch(f" ↩ Serveur ignoré par l'utilisateur", "warn")
|
|
s["status"] = "IGNORE"
|
|
self._update_tree_status(s)
|
|
continue
|
|
elif action == "apply_all":
|
|
self.apply_all_cmd = True
|
|
if len(dialog_result) > 1:
|
|
yum_cmd = dialog_result[1]
|
|
approved_cmd = yum_cmd
|
|
elif action == "apply":
|
|
if len(dialog_result) > 1:
|
|
yum_cmd = dialog_result[1]
|
|
else:
|
|
if approved_cmd:
|
|
yum_cmd = approved_cmd
|
|
|
|
self._log_patch(f" Connexion...", "info")
|
|
t0 = time.time()
|
|
client, eff_host = ssh_connect(
|
|
server, s["env"], self.settings,
|
|
self.pkey, self.pkey2, self.cyb_pwd)
|
|
|
|
if client is None:
|
|
dur = round(time.time() - t0, 1)
|
|
self._log_patch(f" ❌ Connexion impossible ({dur}s)", "ko")
|
|
s["status"] = "AUTH_KO"
|
|
self.results.append({**s, "duration": f"{dur}s",
|
|
"patch_status":"KO","patch_detail":"Connexion impossible"})
|
|
self._update_tree_status(s)
|
|
self._update_progress(idx+1, total)
|
|
continue
|
|
|
|
self._log_patch(f" ✅ Connecté : {eff_host}", "ok")
|
|
|
|
# Détection physique
|
|
s["is_physical"] = detect_physical(client)
|
|
if s["is_physical"]:
|
|
self._log_patch(f" 🖥 Serveur physique détecté", "warn")
|
|
|
|
# Pré-patching
|
|
self._log_patch(f" Pre-patching...", "info")
|
|
pre_out, _ = run_cmd(client, PRE_PATCH_SCRIPT, 30)
|
|
if "PRE_PATCH_OK" in pre_out:
|
|
self._log_patch(f" Pre-patch OK", "ok")
|
|
else:
|
|
self._log_patch(f" Pre-patch incomplet", "warn")
|
|
|
|
# Flux Libre — snapshot pods Podman
|
|
is_fl = "flux libre" in s.get("domain", "").lower()
|
|
if is_fl:
|
|
self._log_patch(f" Flux Libre : snapshot pods Podman...", "info")
|
|
fl_out, _ = run_cmd(client, FL_PRE_PATCH_SCRIPT, 30)
|
|
if "FL_PRE_OK" in fl_out:
|
|
for line in fl_out.splitlines():
|
|
if line.startswith("FL_USER=") or line.startswith("FL_PODS_SAVED="):
|
|
self._log_patch(f" {line}", "ok")
|
|
elif "FL_NO_PODS" in fl_out:
|
|
self._log_patch(f" Pas de pods Podman detectes", "info")
|
|
else:
|
|
self._log_patch(f" Snapshot pods incomplet", "warn")
|
|
|
|
# Snapshot versions avant
|
|
ver_before = {}
|
|
if packages:
|
|
for pkg in packages.split():
|
|
out, _ = run_cmd(client, f"rpm -q {pkg} 2>/dev/null || echo NOT_INSTALLED", 15)
|
|
ver_before[pkg] = out.strip()
|
|
|
|
# Exécution yum
|
|
self._log_patch(f" 🔧 Commande : {yum_cmd[:80]}...", "cmd")
|
|
yum_out, yum_err = run_cmd(client, yum_cmd, 600)
|
|
|
|
if yum_out:
|
|
for line in yum_out.splitlines()[-10:]:
|
|
self._log_patch(f" {line}", "info")
|
|
|
|
# Snapshot versions après
|
|
ver_after = {}
|
|
if packages:
|
|
for pkg in packages.split():
|
|
out, _ = run_cmd(client, f"rpm -q {pkg} 2>/dev/null || echo NOT_INSTALLED", 15)
|
|
ver_after[pkg] = out.strip()
|
|
|
|
dur = round(time.time() - t0, 1)
|
|
|
|
# Déterminer statut
|
|
if dryrun:
|
|
pkg_lines = [l for l in yum_out.splitlines()
|
|
if l.split() and "." in l.split()[0]
|
|
and not any(x in l for x in ["kB","bps","00:","Updat"])]
|
|
if pkg_lines:
|
|
s["patch_status"] = "UPDATE_AVAIL"
|
|
s["patch_detail"] = " | ".join(pkg_lines)[:200]
|
|
self._log_patch(f" ⚠ Updates disponibles : {len(pkg_lines)} package(s)", "warn")
|
|
else:
|
|
s["patch_status"] = "UP_TO_DATE"
|
|
s["patch_detail"] = "Deja a jour"
|
|
self._log_patch(f" ✅ A jour", "ok")
|
|
else:
|
|
updated = [p for p in (packages.split() if packages else [])
|
|
if ver_before.get(p) != ver_after.get(p)]
|
|
if updated or "Complete!" in yum_out:
|
|
s["patch_status"] = "PATCHED"
|
|
s["patch_detail"] = " | ".join(
|
|
f"{p}: {ver_before.get(p,'?')} -> {ver_after.get(p,'?')}"
|
|
for p in updated)[:200] if updated else "Packages mis a jour"
|
|
self._log_patch(f" ✅ PATCHÉ : {s['patch_detail'][:80]}", "ok")
|
|
elif "Nothing to do" in yum_out or "No packages" in yum_out:
|
|
s["patch_status"] = "UP_TO_DATE"
|
|
s["patch_detail"] = "Deja a jour"
|
|
self._log_patch(f" ✅ Déjà à jour", "ok")
|
|
else:
|
|
s["patch_status"] = "UP_TO_DATE"
|
|
s["patch_detail"] = "Deja a jour"
|
|
|
|
# Vérifier reboot nécessaire
|
|
reboot_out, _ = run_cmd(client, "needs-restarting -r 2>/dev/null; echo RC:$?", 15)
|
|
if "RC:1" in reboot_out:
|
|
s["reboot_required"] = True
|
|
self._log_patch(f" ⚠ REBOOT REQUIS", "warn")
|
|
else:
|
|
s["reboot_required"] = False
|
|
|
|
s["status"] = "OK"
|
|
s["duration"] = f"{dur}s"
|
|
client.close()
|
|
|
|
self.results.append(deepcopy(s))
|
|
self._update_tree_status(s)
|
|
self._update_progress(idx+1, total)
|
|
|
|
# Fin
|
|
self.running = False
|
|
self.root.after(0, lambda: [
|
|
self.go_btn.configure(state="normal"),
|
|
self.stop_btn.configure(state="disabled"),
|
|
self._patch_done_summary()
|
|
])
|
|
|
|
def _update_tree_status(self, s):
|
|
def _do():
|
|
if not self.srv_tree.exists(s["server"]): return
|
|
vals = list(self.srv_tree.item(s["server"], "values"))
|
|
vals[9] = s.get("status", "—") # statut est maintenant col 9
|
|
tag = ("ok" if s.get("status")=="OK" and s.get("patch_status") not in ("KO",)
|
|
else "ko" if s.get("status")=="AUTH_KO" else "normal")
|
|
self.srv_tree.item(s["server"], values=vals, tags=(tag,))
|
|
self.root.after(0, _do)
|
|
|
|
def _update_progress(self, done, total):
|
|
pct = (done/total)*100
|
|
self.root.after(0, lambda: [
|
|
self.prog_var.set(pct),
|
|
self.prog_lbl.configure(text=f"{done}/{total} ({pct:.0f}%)")])
|
|
|
|
def _patch_done_summary(self):
|
|
ok_c = sum(1 for r in self.results if r.get("status")=="OK")
|
|
p_c = sum(1 for r in self.results if r.get("patch_status")=="PATCHED")
|
|
rb_c = sum(1 for r in self.results if r.get("reboot_required"))
|
|
ko_c = sum(1 for r in self.results if r.get("status")!="OK")
|
|
msg = (f"Patch termine !\n\n"
|
|
f"Total traite : {len(self.results)}\n"
|
|
f"OK : {ok_c}\n"
|
|
f"Patche : {p_c}\n"
|
|
f"Reboot requis: {rb_c}\n"
|
|
f"KO/Auth : {ko_c}")
|
|
messagebox.showinfo("Patch termine", msg)
|
|
if rb_c > 0:
|
|
messagebox.showwarning("Reboot requis",
|
|
f"{rb_c} serveur(s) necessitent un reboot.\n"
|
|
"Utiliser l'onglet Post-patching > Reboot.")
|
|
# Auto-générer le rapport dans l'onglet 4
|
|
self.root.after(500, self._generate_report)
|
|
# Basculer sur l'onglet rapport
|
|
self.root.after(600, lambda: self.nb.select(self.tab_report))
|
|
|
|
# ==========================================================================
|
|
# ONGLET 3 — POST-PATCHING
|
|
# ==========================================================================
|
|
def _post_patch_check(self):
|
|
targets = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
|
|
if not targets:
|
|
messagebox.showwarning("Attention", "Aucun serveur sélectionné.")
|
|
return
|
|
threading.Thread(target=self._post_worker, args=(targets,), daemon=True).start()
|
|
|
|
def _post_worker(self, servers):
|
|
for s in servers:
|
|
self._log_post(f"\n── {s['server']} ──", "info")
|
|
client, eff = ssh_connect(s["server"], s["env"], self.settings,
|
|
self.pkey, self.pkey2, self.cyb_pwd)
|
|
if not client:
|
|
self._log_post(f" ❌ Connexion impossible", "ko")
|
|
continue
|
|
|
|
out, _ = run_cmd(client, POST_PATCH_SCRIPT, 60)
|
|
for line in out.splitlines():
|
|
if "ALERTE" in line or "disparu" in line.lower():
|
|
self._log_post(f" {line}", "ko")
|
|
elif "WARN" in line or "nouveau" in line.lower():
|
|
self._log_post(f" {line}", "warn")
|
|
elif "OK" in line:
|
|
self._log_post(f" {line}", "ok")
|
|
else:
|
|
self._log_post(f" {line}", "info")
|
|
|
|
# Flux Libre — verifier et redemarrer les pods
|
|
is_fl = "flux libre" in s.get("domain", "").lower()
|
|
if is_fl:
|
|
self._log_post(f" Flux Libre : verification pods Podman...", "info")
|
|
fl_out, _ = run_cmd(client, FL_POST_PATCH_SCRIPT, 60)
|
|
for line in fl_out.splitlines():
|
|
if "FL_RESTART=" in line:
|
|
pod = line.split("=", 1)[1]
|
|
self._log_post(f" Redemarrage pod: {pod}", "warn")
|
|
elif "FL_ALL_OK" in line:
|
|
self._log_post(f" Tous les pods sont actifs", "ok")
|
|
elif "FL_RESTARTED=" in line:
|
|
n = line.split("=", 1)[1]
|
|
self._log_post(f" {n} pod(s) redemarres", "warn")
|
|
elif "FL_USER=" in line:
|
|
self._log_post(f" {line}", "info")
|
|
elif "FL_NO_PODS" in line:
|
|
self._log_post(f" Pas de pods Podman", "info")
|
|
|
|
client.close()
|
|
|
|
def _ask_mark_patched(self):
|
|
"""Demande confirmation puis marque les serveurs patchés en vert dans Excel"""
|
|
patched = [s for s in self.servers
|
|
if s.get("patch_status") == "PATCHED" and s.get("selected")]
|
|
if not patched:
|
|
patched = [r for r in self.results if r.get("patch_status") == "PATCHED"]
|
|
if not patched:
|
|
messagebox.showwarning("Attention",
|
|
"Aucun serveur patche detecte.\n"
|
|
"Selectionner manuellement les serveurs a marquer.")
|
|
patched = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
|
|
if not patched:
|
|
return
|
|
|
|
names = "\n".join(s["server"] for s in patched[:10])
|
|
if len(patched) > 10:
|
|
names += f"\n... et {len(patched)-10} autres"
|
|
|
|
if messagebox.askyesno(
|
|
"Marquer comme patches dans Excel",
|
|
f"Marquer {len(patched)} serveur(s) en VERT dans le fichier Excel ?\n\n{names}\n\nCette action modifie le fichier Excel original.",
|
|
icon="question"):
|
|
self._mark_patched_excel(patched)
|
|
|
|
def _delete_old_snapshots(self):
|
|
"""Supprime les snapshots de moins de 3 jours sur les serveurs sélectionnés"""
|
|
if not VSPHERE_OK:
|
|
messagebox.showwarning("pyVmomi manquant",
|
|
"pyVmomi non installe.\npy -m pip install pyVmomi --proxy http://proxy.sanef.fr:8080")
|
|
return
|
|
|
|
selected = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
|
|
if not selected:
|
|
messagebox.showwarning("Attention", "Aucun serveur selectionne.")
|
|
return
|
|
|
|
# Prompt vSphere
|
|
dlg = tk.Toplevel(self.root)
|
|
dlg.title("Supprimer snapshots vSphere")
|
|
dlg.geometry("400x220")
|
|
dlg.configure(bg=self.BG)
|
|
dlg.grab_set()
|
|
|
|
tk.Label(dlg, text="Supprimer snapshots > 3 jours",
|
|
bg=self.BG, fg=self.ACCENT,
|
|
font=("Consolas",12,"bold")).pack(pady=10)
|
|
|
|
for lbl, attr, show in [("Utilisateur :", "vsd_user", ""),
|
|
("Mot de passe :", "vsd_pwd", "●")]:
|
|
f = tk.Frame(dlg, bg=self.BG); f.pack(fill="x", padx=20, pady=4)
|
|
tk.Label(f, text=lbl, bg=self.BG, fg=self.FG,
|
|
font=("Consolas",10), width=14).pack(side="left")
|
|
var = tk.StringVar(); setattr(self, attr, var)
|
|
tk.Entry(f, textvariable=var, show=show, bg=self.BG2, fg=self.FG,
|
|
font=("Consolas",10), insertbackground=self.FG).pack(side="left", fill="x", expand=True)
|
|
|
|
days_var = tk.IntVar(value=3)
|
|
fd = tk.Frame(dlg, bg=self.BG); fd.pack(fill="x", padx=20, pady=4)
|
|
tk.Label(fd, text="Supprimer snaps < (jours) :", bg=self.BG, fg=self.FG,
|
|
font=("Consolas",10), width=24).pack(side="left")
|
|
tk.Spinbox(fd, from_=1, to=30, textvariable=days_var, width=5,
|
|
bg=self.BG2, fg=self.FG).pack(side="left")
|
|
|
|
def do_delete():
|
|
user = self.vsd_user.get().strip()
|
|
pwd = self.vsd_pwd.get().strip()
|
|
days = days_var.get()
|
|
dlg.destroy()
|
|
threading.Thread(target=self._snap_delete_worker,
|
|
args=(selected, user, pwd, days), daemon=True).start()
|
|
|
|
tk.Button(dlg, text="🗑 Supprimer", bg="#e64553", fg="white",
|
|
font=("Consolas",11,"bold"), pady=6,
|
|
command=do_delete).pack(pady=10)
|
|
|
|
def _snap_delete_worker(self, servers, vs_user, vs_pwd, max_days):
|
|
"""Supprime les snapshots PrePatch vieux de PLUS de max_days jours (garde les récents)"""
|
|
from datetime import timezone
|
|
cutoff = datetime.now(tz=timezone.utc) - __import__("datetime").timedelta(days=max_days)
|
|
|
|
connected_vcs = []
|
|
for vc in VSPHERE_HOSTS:
|
|
try:
|
|
import ssl
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
si = SmartConnect(host=vc, user=vs_user, pwd=vs_pwd,
|
|
sslContext=ctx, connectionPoolTimeout=10)
|
|
vc_content = si.RetrieveContent()
|
|
self._log_post(f"Connecte vCenter : {vc}", "ok")
|
|
connected_vcs.append((vc, si, vc_content))
|
|
except Exception as e:
|
|
self._log_post(f"Inaccessible : {vc} — {str(e)[:50]}", "ko")
|
|
|
|
if not connected_vcs:
|
|
self._log_post("Aucun vCenter joignable", "ko")
|
|
return
|
|
|
|
for s in servers:
|
|
for vc, si, vc_content in connected_vcs:
|
|
vm = self._find_vm(vc_content, s["server"])
|
|
if not vm:
|
|
continue
|
|
self._log_post(f"\n VM : {vm.name} sur {vc}", "info")
|
|
|
|
def get_snapshots(snap_list):
|
|
snaps = []
|
|
for snap in snap_list:
|
|
snaps.append(snap)
|
|
snaps.extend(get_snapshots(snap.childSnapshotList))
|
|
return snaps
|
|
|
|
if not vm.snapshot:
|
|
self._log_post(f" Aucun snapshot", "info")
|
|
continue
|
|
|
|
all_snaps = get_snapshots(vm.snapshot.rootSnapshotList)
|
|
for snap in all_snaps:
|
|
snap_time = snap.createTime
|
|
if snap_time.tzinfo is None:
|
|
snap_time = snap_time.replace(tzinfo=timezone.utc)
|
|
age_days = (datetime.now(tz=timezone.utc) - snap_time).days
|
|
if age_days > max_days:
|
|
# Vieux de plus de max_days jours → supprimer
|
|
self._log_post(
|
|
f" Snapshot : {snap.name} ({age_days}j) — SUPPRESSION (>{max_days}j)", "warn")
|
|
try:
|
|
task = snap.snapshot.RemoveSnapshot_Task(removeChildren=False)
|
|
while task.info.state not in ("success","error"):
|
|
time.sleep(1)
|
|
if task.info.state == "success":
|
|
self._log_post(f" OK supprime : {snap.name}", "ok")
|
|
else:
|
|
self._log_post(f" ERREUR suppression : {snap.name}", "ko")
|
|
except Exception as e:
|
|
self._log_post(f" ERREUR : {e}", "ko")
|
|
else:
|
|
# Récent (≤ max_days jours) → conserver
|
|
self._log_post(
|
|
f" Snapshot : {snap.name} ({age_days}j) — conservé (<= {max_days}j)", "info")
|
|
break
|
|
|
|
for vc, si, _ in connected_vcs:
|
|
try: Disconnect(si)
|
|
except: pass
|
|
|
|
def _remove_old_kernels(self):
|
|
targets = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
|
|
if not targets:
|
|
messagebox.showwarning("Attention", "Aucun serveur sélectionné.")
|
|
return
|
|
if not messagebox.askyesno("Confirmation",
|
|
f"Supprimer les anciens kernels sur {len(targets)} serveur(s) ?"):
|
|
return
|
|
def worker():
|
|
for s in targets:
|
|
self._log_post(f"\n── {s['server']} — Suppression anciens kernels ──", "info")
|
|
client, _ = ssh_connect(s["server"], s["env"], self.settings,
|
|
self.pkey, self.pkey2, self.cyb_pwd)
|
|
if not client:
|
|
self._log_post(f" ❌ Connexion impossible", "ko")
|
|
continue
|
|
out, err = run_cmd(client, "sudo package-cleanup --oldkernels --count=1 -y 2>&1 | tail -10", 60)
|
|
for line in out.splitlines():
|
|
self._log_post(f" {line}", "ok" if "Removed" in line else "info")
|
|
client.close()
|
|
threading.Thread(target=worker, daemon=True).start()
|
|
|
|
def _undo_yum(self):
|
|
self.audit("UNDO_YUM", "")
|
|
target = None
|
|
for s in self.servers:
|
|
if s.get("selected") and s["accord"]=="oui":
|
|
target = s
|
|
break
|
|
if not target:
|
|
messagebox.showwarning("Attention", "Sélectionner au moins un serveur.")
|
|
return
|
|
|
|
self._log_post(f"\n── {target['server']} — Historique yum ──", "info")
|
|
|
|
def worker():
|
|
client, _ = ssh_connect(target["server"], target["env"], self.settings,
|
|
self.pkey, self.pkey2, self.cyb_pwd)
|
|
if not client:
|
|
self._log_post(" ❌ Connexion impossible", "ko")
|
|
return
|
|
out, _ = run_cmd(client, "sudo yum history list 2>/dev/null | head -20", 20)
|
|
client.close()
|
|
self._log_post(out, "info")
|
|
|
|
# Demander l'ID à annuler
|
|
def ask():
|
|
tid = simpledialog.askstring("Undo yum",
|
|
f"Historique yum sur {target['server']} :\n\n{out}\n\nID de transaction à annuler :")
|
|
if tid:
|
|
threading.Thread(target=do_undo, args=(tid,), daemon=True).start()
|
|
|
|
def do_undo(tid):
|
|
self._log_post(f" Undo transaction {tid}...", "warn")
|
|
c2, _ = ssh_connect(target["server"], target["env"], self.settings,
|
|
self.pkey, self.pkey2, self.cyb_pwd)
|
|
if not c2:
|
|
self._log_post(" ❌ Connexion impossible", "ko")
|
|
return
|
|
out2, _ = run_cmd(c2, f"sudo yum history undo {tid} -y 2>&1 | tail -15", 120)
|
|
for line in out2.splitlines():
|
|
self._log_post(f" {line}", "ok" if "Complete" in line else "info")
|
|
c2.close()
|
|
|
|
self.root.after(0, ask)
|
|
|
|
threading.Thread(target=worker, daemon=True).start()
|
|
|
|
def _reboot_servers(self):
|
|
self.audit("REBOOT_START", "")
|
|
reboot_list = [s for s in self.servers
|
|
if s.get("selected") and s.get("reboot_required") and s["accord"]=="oui"]
|
|
if not reboot_list:
|
|
reboot_list = [s for s in self.servers if s.get("selected") and s["accord"]=="oui"]
|
|
if not reboot_list:
|
|
messagebox.showwarning("Attention", "Aucun serveur sélectionné.")
|
|
return
|
|
names = "\n".join(s["server"] for s in reboot_list[:10])
|
|
if not messagebox.askyesno("⚠️ Confirmation reboot",
|
|
f"Redémarrer {len(reboot_list)} serveur(s) ?\n\n{names}", icon="warning"):
|
|
return
|
|
def worker():
|
|
for s in reboot_list:
|
|
self._log_post(f"\n── {s['server']} — Reboot ──", "warn")
|
|
client, _ = ssh_connect(s["server"], s["env"], self.settings,
|
|
self.pkey, self.pkey2, self.cyb_pwd)
|
|
if not client:
|
|
self._log_post(" ❌ Connexion impossible", "ko"); continue
|
|
run_cmd(client, "sudo shutdown -r +1 'Reboot post-patching' &", 10)
|
|
client.close()
|
|
self._log_post(f" ✅ Reboot planifié dans 1 min", "ok")
|
|
threading.Thread(target=worker, daemon=True).start()
|
|
|
|
# ==========================================================================
|
|
# ONGLET 4 — RAPPORT
|
|
# ==========================================================================
|
|
def _generate_report(self):
|
|
"""Génère le rapport visuel — KPI widgets + tableau détaillé + log brut"""
|
|
if not self.results:
|
|
messagebox.showwarning("Attention", "Aucun résultat de patch disponible.")
|
|
return
|
|
|
|
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
week = date.today().isocalendar()[1]
|
|
|
|
# ── Calcul des KPI ────────────────────────────────────────────────────
|
|
total = len(self.results)
|
|
ok_c = sum(1 for r in self.results if r.get("status")=="OK")
|
|
p_c = sum(1 for r in self.results if r.get("patch_status")=="PATCHED")
|
|
uptodate = sum(1 for r in self.results if r.get("patch_status") in ("UP_TO_DATE","UP_TO_DATE"))
|
|
rb_c = sum(1 for r in self.results if r.get("reboot_required"))
|
|
ko_c = sum(1 for r in self.results if r.get("status")!="OK")
|
|
|
|
# Mettre à jour les widgets KPI
|
|
self.kpi_widgets["total" ].configure(text=str(total))
|
|
self.kpi_widgets["ok" ].configure(text=str(ok_c))
|
|
self.kpi_widgets["patched" ].configure(text=str(p_c))
|
|
self.kpi_widgets["uptodate"].configure(text=str(uptodate))
|
|
self.kpi_widgets["reboot" ].configure(text=str(rb_c))
|
|
self.kpi_widgets["ko" ].configure(text=str(ko_c))
|
|
|
|
# ── Peupler le tableau détaillé ───────────────────────────────────────
|
|
for item in self.report_tree.get_children():
|
|
self.report_tree.delete(item)
|
|
|
|
for r in self.results:
|
|
accord = "OUI" if r.get("accord")=="oui" else "NON"
|
|
snap = "OK" if r.get("snap_done") else ("PHY" if r.get("is_physical") else "—")
|
|
conn = r.get("status","?")
|
|
ps = r.get("patch_status","?")
|
|
rb = "OUI" if r.get("reboot_required") else "—"
|
|
detail = r.get("patch_detail","")[:60]
|
|
|
|
# Choisir le tag couleur
|
|
if r.get("status") != "OK":
|
|
tag = "ko"
|
|
elif r.get("patch_status") == "PATCHED":
|
|
tag = "patched"
|
|
elif r.get("reboot_required"):
|
|
tag = "reboot"
|
|
elif r.get("patch_status") in ("UP_TO_DATE",):
|
|
tag = "uptodate"
|
|
else:
|
|
tag = "ok"
|
|
|
|
self.report_tree.insert("", "end", values=(
|
|
r.get("server",""),
|
|
r.get("env",""),
|
|
r.get("domain",""),
|
|
accord,
|
|
snap,
|
|
conn,
|
|
ps,
|
|
rb,
|
|
detail,
|
|
), tags=(tag,))
|
|
|
|
# ── Log brut ──────────────────────────────────────────────────────────
|
|
self.report_txt.configure(state="normal")
|
|
self.report_txt.delete("1.0", "end")
|
|
|
|
def w(txt, tag="info"):
|
|
self.report_txt.insert("end", txt+"\n", tag)
|
|
|
|
w(f"{'='*70}", "title")
|
|
w(f" RAPPORT PATCHING — Semaine {week} — {now}", "title")
|
|
w(f" Fichier : {self.excel_var.get()}", "info")
|
|
w(f" Intervenant : {self.settings.get('patcher','—')}", "info")
|
|
w(f"{'='*70}", "title")
|
|
w("")
|
|
w(f" RESUME : {total} traites | {ok_c} OK | {p_c} patches | {rb_c} reboot | {ko_c} KO", "info")
|
|
w("")
|
|
w(f" {'SERVEUR':<35} {'STATUT':<12} {'PATCH':<16} {'REBOOT'}", "title")
|
|
w(f" {'-'*66}", "info")
|
|
|
|
for r in self.results:
|
|
status = r.get("status","?")
|
|
ps = r.get("patch_status","?")
|
|
rb = "REBOOT" if r.get("reboot_required") else ""
|
|
tag = "ok" if status=="OK" else "ko"
|
|
if r.get("patch_status") == "PATCHED": tag = "ok"
|
|
if r.get("reboot_required"): tag = "warn"
|
|
w(f" {r.get('server',''):<35} {status:<12} {ps:<16} {rb}", tag)
|
|
if r.get("patch_detail"):
|
|
w(f" → {r['patch_detail'][:80]}", "info")
|
|
|
|
w("")
|
|
w(f"{'='*70}", "title")
|
|
self.report_txt.configure(state="disabled")
|
|
|
|
def _mark_patched_excel(self, servers_to_mark):
|
|
"""Met le fond vert sur les lignes patchees dans le fichier Excel.
|
|
Utilise une copie temporaire pour eviter les conflits OneDrive/verrou."""
|
|
import shutil
|
|
import tempfile
|
|
|
|
filepath = self.excel_var.get().strip()
|
|
sheet = self.sheet_var.get()
|
|
if not filepath or not os.path.exists(filepath):
|
|
messagebox.showerror("Erreur Excel", "Fichier Excel introuvable")
|
|
return
|
|
|
|
try:
|
|
from openpyxl import load_workbook
|
|
from openpyxl.styles import PatternFill
|
|
|
|
# Copie temporaire pour eviter le verrou OneDrive
|
|
tmp_dir = tempfile.mkdtemp()
|
|
tmp_path = os.path.join(tmp_dir, os.path.basename(filepath))
|
|
shutil.copy2(filepath, tmp_path)
|
|
|
|
wb = load_workbook(tmp_path)
|
|
ws = wb[sheet]
|
|
green_fill = PatternFill(start_color="00B050", end_color="00B050", fill_type="solid")
|
|
|
|
marked = 0
|
|
for s in servers_to_mark:
|
|
row_num = s.get("excel_row")
|
|
if not row_num:
|
|
for row in ws.iter_rows():
|
|
if row[0].value == s["server"]:
|
|
row_num = row[0].row
|
|
break
|
|
if row_num:
|
|
for col in range(1, ws.max_column + 1):
|
|
ws.cell(row=row_num, column=col).fill = green_fill
|
|
marked += 1
|
|
|
|
# Sauvegarder dans le temp
|
|
wb.save(tmp_path)
|
|
wb.close()
|
|
|
|
# Tenter de recopier vers l'original
|
|
try:
|
|
shutil.copy2(tmp_path, filepath)
|
|
self._log_patch(f" {marked} ligne(s) marquee(s) en vert dans Excel", "ok")
|
|
messagebox.showinfo("Excel mis a jour",
|
|
f"{marked} serveur(s) marques comme patches (fond vert) dans :\n{filepath}")
|
|
except PermissionError:
|
|
# OneDrive verrouille le fichier — sauvegarder a cote
|
|
backup_path = filepath.replace(".xlsx", f"_patched_{datetime.now():%Y%m%d_%H%M}.xlsx")
|
|
shutil.copy2(tmp_path, backup_path)
|
|
self._log_patch(f" Excel verrouille, sauvegarde dans : {backup_path}", "warn")
|
|
messagebox.showwarning("Excel verrouille",
|
|
f"Le fichier Excel est verrouille (OneDrive).\n\n"
|
|
f"Une copie a ete sauvegardee :\n{backup_path}\n\n"
|
|
f"Fermez le fichier original puis copiez la version patchee.")
|
|
|
|
# Nettoyage temp
|
|
try:
|
|
os.remove(tmp_path)
|
|
os.rmdir(tmp_dir)
|
|
except Exception:
|
|
pass
|
|
|
|
self.audit("MARK_EXCEL", f"{marked} serveurs marques, sheet={sheet}")
|
|
|
|
except Exception as e:
|
|
messagebox.showerror("Erreur Excel", f"Impossible de modifier l'Excel :\n{e}")
|
|
|
|
def _export_csv(self):
|
|
if not self.results:
|
|
messagebox.showwarning("Attention","Aucun résultat.")
|
|
return
|
|
rep_dir = os.path.join(os.path.expanduser("~"),"Documents","patch_reports")
|
|
os.makedirs(rep_dir, exist_ok=True)
|
|
csv_file = os.path.join(rep_dir, f"patch_S{date.today().isocalendar()[1]}_{datetime.now():%Y%m%d_%H%M%S}.csv")
|
|
fields = ["server","env","domain","app","accord","status","patch_status",
|
|
"patch_detail","reboot_required","snap_done","is_physical"]
|
|
with open(csv_file,"w",newline="",encoding="utf-8") as f:
|
|
w = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore")
|
|
w.writeheader(); w.writerows(self.results)
|
|
messagebox.showinfo("Export CSV", f"Fichier : {csv_file}")
|
|
|
|
def _export_dokuwiki(self):
|
|
if not self.results:
|
|
messagebox.showwarning("Attention","Aucun résultat.")
|
|
return
|
|
rep_dir = os.path.join(os.path.expanduser("~"),"Documents","patch_reports")
|
|
os.makedirs(rep_dir, exist_ok=True)
|
|
wiki_file = os.path.join(rep_dir, f"patch_wiki_S{date.today().isocalendar()[1]}_{datetime.now():%Y%m%d}.txt")
|
|
week = date.today().isocalendar()[1]
|
|
with open(wiki_file,"w",encoding="utf-8") as f:
|
|
f.write(f"====== Rapport Patching — Semaine {week} ======\n\n")
|
|
f.write(f"//Généré le {datetime.now():%d/%m/%Y %H:%M} — MYPCZEN / SANEF DSI-SOC//\n\n")
|
|
f.write("===== Résumé =====\n\n")
|
|
f.write(f"^ Serveurs traités ^ Patchés ^ Reboot requis ^ KO ^\n")
|
|
ok_c = sum(1 for r in self.results if r.get("status")=="OK")
|
|
p_c = sum(1 for r in self.results if r.get("patch_status")=="PATCHED")
|
|
rb_c = sum(1 for r in self.results if r.get("reboot_required"))
|
|
ko_c = sum(1 for r in self.results if r.get("status")!="OK")
|
|
f.write(f"| {len(self.results)} | {p_c} | {rb_c} | {ko_c} |\n\n")
|
|
f.write("===== Détail =====\n\n")
|
|
f.write("^ Serveur ^ Env ^ Statut ^ Patch ^ Reboot ^ Accord ^ Snapshot ^\n")
|
|
for r in self.results:
|
|
rb = "⚠ OUI" if r.get("reboot_required") else "NON"
|
|
f.write(f"| {r['server']} | {r['env']} | {r.get('status','?')} | "
|
|
f"{r.get('patch_status','?')} | {rb} | "
|
|
f"{'OUI' if r.get('accord')=='oui' else 'NON'} | "
|
|
f"{'OUI' if r.get('snap_done') else 'NON'} |\n")
|
|
messagebox.showinfo("Export DokuWiki", f"Fichier : {wiki_file}")
|
|
|
|
# ==========================================================================
|
|
# ONGLET 5 — AUDIT
|
|
# ==========================================================================
|
|
def _build_tab_audit(self):
|
|
tab = self.tab_audit
|
|
BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
|
|
|
|
hf = tk.Frame(tab, bg="#181825", pady=6)
|
|
hf.pack(fill="x")
|
|
tk.Label(hf, text="Journal d'audit", bg="#181825", fg=ACCENT,
|
|
font=("Consolas", 13, "bold")).pack(side="left", padx=12)
|
|
tk.Button(hf, text="Rafraichir", bg=BTN, fg=FG, font=("Consolas", 9),
|
|
command=self._refresh_audit).pack(side="right", padx=12)
|
|
tk.Button(hf, text="Exporter CSV", bg=BTN, fg=FG, font=("Consolas", 9),
|
|
command=self._export_audit_csv).pack(side="right", padx=4)
|
|
tk.Button(hf, text="Changer mon mot de passe", bg="#fab387", fg="#1e1e2e",
|
|
font=("Consolas", 9), command=self._change_my_password).pack(side="right", padx=4)
|
|
|
|
cols = ("timestamp", "user", "action", "details")
|
|
self.audit_tree = ttk.Treeview(tab, columns=cols, show="headings")
|
|
self.audit_tree.heading("timestamp", text="Date/Heure")
|
|
self.audit_tree.heading("user", text="Utilisateur")
|
|
self.audit_tree.heading("action", text="Action")
|
|
self.audit_tree.heading("details", text="Details")
|
|
self.audit_tree.column("timestamp", width=160)
|
|
self.audit_tree.column("user", width=120)
|
|
self.audit_tree.column("action", width=150)
|
|
self.audit_tree.column("details", width=500)
|
|
sb = ttk.Scrollbar(tab, orient="vertical", command=self.audit_tree.yview)
|
|
self.audit_tree.configure(yscrollcommand=sb.set)
|
|
self.audit_tree.pack(side="left", fill="both", expand=True, padx=6, pady=4)
|
|
sb.pack(side="right", fill="y", pady=4)
|
|
self._refresh_audit()
|
|
|
|
def _refresh_audit(self):
|
|
self.audit_tree.delete(*self.audit_tree.get_children())
|
|
for log in self.db.get_logs(500):
|
|
self.audit_tree.insert("", "end", values=(
|
|
log["timestamp"], log["username"], log["action"], log.get("details", "")))
|
|
|
|
def _export_audit_csv(self):
|
|
logs = self.db.get_logs(5000)
|
|
if not logs:
|
|
messagebox.showinfo("Info", "Aucun log"); return
|
|
p = filedialog.asksaveasfilename(defaultextension=".csv",
|
|
filetypes=[("CSV", "*.csv")],
|
|
initialfile=f"audit_patch_{datetime.now():%Y%m%d_%H%M%S}.csv")
|
|
if not p: return
|
|
with open(p, "w", newline="", encoding="utf-8-sig") as f:
|
|
w = csv.writer(f, delimiter=";")
|
|
w.writerow(["Timestamp", "Utilisateur", "Action", "Details"])
|
|
for l in logs:
|
|
w.writerow([l["timestamp"], l["username"], l["action"], l.get("details", "")])
|
|
self.audit("EXPORT_AUDIT", f"{len(logs)} entries -> {p}")
|
|
|
|
def _change_my_password(self):
|
|
dlg = ChangePasswordDialog(self.root, self.db, self.current_user["username"])
|
|
# Centrer sur le parent
|
|
if dlg.success:
|
|
messagebox.showinfo("OK", "Mot de passe modifie avec succes")
|
|
|
|
# ==========================================================================
|
|
# ONGLET 6 — UTILISATEURS (admin only)
|
|
# ==========================================================================
|
|
def _build_tab_users(self):
|
|
tab = self.tab_users
|
|
BG=self.BG; BG2=self.BG2; FG=self.FG; ACCENT=self.ACCENT; BTN=self.BTN
|
|
|
|
# Creation user
|
|
cf = tk.Frame(tab, bg="#181825", pady=6)
|
|
cf.pack(fill="x")
|
|
tk.Label(cf, text="Gestion des utilisateurs", bg="#181825", fg=ACCENT,
|
|
font=("Consolas", 13, "bold")).pack(side="left", padx=12)
|
|
|
|
af = tk.Frame(tab, bg=BG, pady=6)
|
|
af.pack(fill="x", padx=12)
|
|
tk.Label(af, text="Utilisateur :", bg=BG, fg=FG, font=("Consolas", 10)).pack(side="left", padx=(0, 4))
|
|
self.new_user_var = tk.StringVar()
|
|
tk.Entry(af, textvariable=self.new_user_var, bg=BG2, fg=FG,
|
|
font=("Consolas", 10), width=15, insertbackground=FG).pack(side="left", padx=4)
|
|
tk.Label(af, text="MDP :", bg=BG, fg=FG, font=("Consolas", 10)).pack(side="left", padx=(8, 4))
|
|
self.new_pwd_var = tk.StringVar()
|
|
tk.Entry(af, textvariable=self.new_pwd_var, bg=BG2, fg=FG,
|
|
font=("Consolas", 10), width=15, show="*", insertbackground=FG).pack(side="left", padx=4)
|
|
tk.Label(af, text="Role :", bg=BG, fg=FG, font=("Consolas", 10)).pack(side="left", padx=(8, 4))
|
|
self.new_role_var = tk.StringVar(value="operator")
|
|
tk.OptionMenu(af, self.new_role_var, "admin", "operator", "viewer").pack(side="left", padx=4)
|
|
tk.Button(af, text="Creer", bg="#40a02b", fg="white", font=("Consolas", 9, "bold"),
|
|
command=self._create_user).pack(side="left", padx=8)
|
|
|
|
# Liste users
|
|
cols = ("username", "role", "locked", "must_change", "last_login")
|
|
self.users_tree = ttk.Treeview(tab, columns=cols, show="headings")
|
|
self.users_tree.heading("username", text="Utilisateur")
|
|
self.users_tree.heading("role", text="Role")
|
|
self.users_tree.heading("locked", text="Verrouille")
|
|
self.users_tree.heading("must_change", text="Chg. MDP")
|
|
self.users_tree.heading("last_login", text="Derniere connexion")
|
|
self.users_tree.column("username", width=150)
|
|
self.users_tree.column("role", width=100)
|
|
self.users_tree.column("locked", width=100)
|
|
self.users_tree.column("must_change", width=100)
|
|
self.users_tree.column("last_login", width=180)
|
|
self.users_tree.pack(fill="both", expand=True, padx=12, pady=6)
|
|
|
|
bf = tk.Frame(tab, bg=BG, pady=6)
|
|
bf.pack(fill="x", padx=12)
|
|
tk.Button(bf, text="Supprimer", bg="#f38ba8", fg="#1e1e2e",
|
|
font=("Consolas", 9, "bold"), command=self._delete_user).pack(side="left", padx=4)
|
|
tk.Button(bf, text="Deverrouiller", bg="#fab387", fg="#1e1e2e",
|
|
font=("Consolas", 9, "bold"), command=self._unlock_user).pack(side="left", padx=4)
|
|
tk.Button(bf, text="Reset MDP", bg=BTN, fg=FG,
|
|
font=("Consolas", 9), command=self._reset_user_pwd).pack(side="left", padx=4)
|
|
tk.Button(bf, text="Rafraichir", bg=BTN, fg=FG,
|
|
font=("Consolas", 9), command=self._refresh_users).pack(side="right", padx=4)
|
|
|
|
self._refresh_users()
|
|
|
|
def _refresh_users(self):
|
|
self.users_tree.delete(*self.users_tree.get_children())
|
|
for u in self.db.list_users():
|
|
self.users_tree.insert("", "end", values=(
|
|
u["username"], u["role"],
|
|
"OUI" if u["locked"] else "",
|
|
"OUI" if u["must_change_pwd"] else "",
|
|
u["last_login"] or ""))
|
|
|
|
def _create_user(self):
|
|
un = self.new_user_var.get().strip()
|
|
pw = self.new_pwd_var.get().strip()
|
|
role = self.new_role_var.get()
|
|
if not un or not pw:
|
|
messagebox.showwarning("Erreur", "Saisir un nom et un mot de passe"); return
|
|
if self.db.create_user(un, pw, role):
|
|
self.audit("CREATE_USER", f"{un} (role={role})")
|
|
self.new_user_var.set(""); self.new_pwd_var.set("")
|
|
self._refresh_users()
|
|
else:
|
|
messagebox.showerror("Erreur", f"L'utilisateur '{un}' existe deja")
|
|
|
|
def _delete_user(self):
|
|
sel = self.users_tree.selection()
|
|
if not sel: return
|
|
un = self.users_tree.item(sel[0])["values"][0]
|
|
if un == "admin":
|
|
messagebox.showwarning("Erreur", "Impossible de supprimer admin"); return
|
|
if messagebox.askyesno("Confirmer", f"Supprimer '{un}' ?"):
|
|
self.db.delete_user(un)
|
|
self.audit("DELETE_USER", un)
|
|
self._refresh_users()
|
|
|
|
def _unlock_user(self):
|
|
sel = self.users_tree.selection()
|
|
if not sel: return
|
|
un = self.users_tree.item(sel[0])["values"][0]
|
|
self.db.unlock_user(un)
|
|
self.audit("UNLOCK_USER", un)
|
|
self._refresh_users()
|
|
|
|
def _reset_user_pwd(self):
|
|
sel = self.users_tree.selection()
|
|
if not sel: return
|
|
un = self.users_tree.item(sel[0])["values"][0]
|
|
new_pwd = simpledialog.askstring("Reset", f"Nouveau mot de passe pour {un} :", show="*")
|
|
if new_pwd:
|
|
self.db.reset_password(un, new_pwd)
|
|
self.audit("RESET_PASSWORD", un)
|
|
self._refresh_users()
|
|
|
|
|
|
# ==============================================================================
|
|
# MAIN
|
|
# ==============================================================================
|
|
if __name__ == "__main__":
|
|
db = Database()
|
|
root = tk.Tk()
|
|
root.withdraw() # Cacher pendant le login
|
|
|
|
login = LoginDialog(root, db)
|
|
if not login.result:
|
|
root.destroy()
|
|
sys.exit(0)
|
|
|
|
user = login.result
|
|
|
|
# Forcer changement de mot de passe au premier login
|
|
if user.get("must_change_pwd"):
|
|
dlg = ChangePasswordDialog(root, db, user["username"], forced=True)
|
|
if not dlg.success:
|
|
root.destroy()
|
|
sys.exit(0)
|
|
|
|
root.deiconify()
|
|
app = PatchManagerV2(root, db, user)
|
|
root.mainloop() |