patchcenter/app/services/safe_patching_service.py
Khalid MOUTAOUAKIL 49d5658475 Safe Patching wizard, SSE terminal, SSH password fallback, Qualys VMDR testé
Safe Patching Quick Win:
- Wizard 4 steps: Prérequis → Snapshot → Exécution → Post-patch
- Step 1: vérif SSH/disque/satellite par branche, exclure les KO
- Step 2: snapshot vSphere VMs
- Step 3: commande yum éditable, lancer hprod puis prod (100% requis)
- Step 4: vérification post-patch, export CSV
- Terminal SSE live (Server-Sent Events) avec couleurs
- Exclusion serveurs par checkbox dans chaque branche
- Label auto Quick Win SXX YYYY

SSH:
- Fallback password depuis settings si clé SSH absente
- Détection auto root (id -u) → pas de sudo si déjà root
- Testé sur VM doli CentOS 7 (10.0.2.4)

Qualys VMDR:
- API 2.0 testée et fonctionnelle avec compte sanef-ae
- Knowledge Base (CVE/QID/packages) accessible
- Host Detections (vulns par host) accessible
- Migration vers API 4.0 à prévoir (EOL dans 85 jours)

Qualys Agent installé sur doli (activation perso qg2)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 06:49:31 +02:00

111 lines
4.7 KiB
Python

"""Service Safe Patching — Quick Win : patching sans interruption de service"""
from datetime import datetime
from sqlalchemy import text
# Packages qui TOUJOURS nécessitent un reboot
REBOOT_PACKAGES = [
"kernel", "kernel-core", "kernel-modules", "kernel-tools",
"glibc", "glibc-common", "glibc-devel",
"systemd", "systemd-libs", "systemd-udev",
"dbus", "dbus-libs", "dbus-daemon",
"linux-firmware", "microcode_ctl",
"polkit", "polkit-libs",
"tuned",
]
# Standard excludes (middleware/apps — jamais en safe)
STD_EXCLUDES = [
"mongodb*", "mysql*", "postgres*", "mariadb*", "oracle*", "pgdg*",
"php*", "java*", "redis*", "elasticsearch*", "nginx*", "mod_ssl*",
"haproxy*", "certbot*", "python-certbot*", "docker*", "podman*",
"centreon*", "qwserver*", "ansible*", "node*", "tina*", "memcached*",
"nextcloud*", "pgbouncer*", "pgpool*", "pgbadger*", "psycopg2*",
"barman*", "kibana*", "splunk*",
]
def build_safe_excludes():
"""Construit la liste d'exclusions pour le safe patching"""
excludes = list(REBOOT_PACKAGES) + [e.replace("*", "") for e in STD_EXCLUDES]
return excludes
def build_yum_command(extra_excludes=None):
"""Génère la commande yum update safe"""
all_excludes = REBOOT_PACKAGES + STD_EXCLUDES
if extra_excludes:
all_excludes += extra_excludes
exclude_str = " ".join([f"--exclude={e}*" if not e.endswith("*") else f"--exclude={e}" for e in all_excludes])
return f"yum update {exclude_str} -y"
def create_quickwin_campaign(db, year, week_number, label, user_id, assistant_id=None):
"""Crée une campagne Quick Win avec les deux branches (hprod + prod)"""
from .campaign_service import _week_dates
wc = f"S{week_number:02d}"
lun, mar, mer, jeu = _week_dates(year, week_number)
row = db.execute(text("""
INSERT INTO campaigns (week_code, year, label, status, date_start, date_end,
created_by, campaign_type)
VALUES (:wc, :y, :label, 'draft', :ds, :de, :uid, 'quickwin')
RETURNING id
"""), {"wc": wc, "y": year, "label": label, "ds": lun, "de": jeu, "uid": user_id}).fetchone()
cid = row.id
# Tous les serveurs Linux en prod secops
servers = db.execute(text("""
SELECT s.id, s.hostname, e.name as env_name
FROM servers s
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
LEFT JOIN environments e ON de.environment_id = e.id
WHERE s.etat = 'en_production' AND s.patch_os_owner = 'secops'
AND s.licence_support IN ('active', 'els') AND s.os_family = 'linux'
ORDER BY e.name, s.hostname
""")).fetchall()
for s in servers:
is_prod = (s.env_name == 'Production')
date_prevue = mer if is_prod else lun # hprod lundi, prod mercredi
db.execute(text("""
INSERT INTO patch_sessions (campaign_id, server_id, status, date_prevue,
intervenant_id, forced_assignment, assigned_at)
VALUES (:cid, :sid, 'pending', :dp, :uid, true, now())
ON CONFLICT (campaign_id, server_id) DO NOTHING
"""), {"cid": cid, "sid": s.id, "dp": date_prevue, "uid": user_id})
# Assigner l'assistant si défini
if assistant_id:
db.execute(text("""
INSERT INTO campaign_operator_limits (campaign_id, user_id, max_servers, note)
VALUES (:cid, :aid, 0, 'Assistant Quick Win')
"""), {"cid": cid, "aid": assistant_id})
count = db.execute(text(
"SELECT COUNT(*) FROM patch_sessions WHERE campaign_id = :cid"
), {"cid": cid}).scalar()
db.execute(text("UPDATE campaigns SET total_servers = :c WHERE id = :cid"),
{"c": count, "cid": cid})
db.commit()
return cid
def get_quickwin_stats(db, campaign_id):
"""Stats Quick Win par branche"""
return db.execute(text("""
SELECT
COUNT(*) FILTER (WHERE e.name != 'Production') as hprod_total,
COUNT(*) FILTER (WHERE e.name != 'Production' AND ps.status = 'patched') as hprod_patched,
COUNT(*) FILTER (WHERE e.name != 'Production' AND ps.status = 'failed') as hprod_failed,
COUNT(*) FILTER (WHERE e.name = 'Production') as prod_total,
COUNT(*) FILTER (WHERE e.name = 'Production' AND ps.status = 'patched') as prod_patched,
COUNT(*) FILTER (WHERE e.name = 'Production' AND ps.status = 'failed') as prod_failed,
COUNT(*) FILTER (WHERE ps.status = 'excluded') as excluded
FROM patch_sessions ps
JOIN servers s ON ps.server_id = s.id
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
LEFT JOIN environments e ON de.environment_id = e.id
WHERE ps.campaign_id = :cid
"""), {"cid": campaign_id}).fetchone()