patchcenter/app/services/quickwin_service.py
Khalid MOUTAOUAKIL 5cc10c5b6c Module QuickWin complet + filtres serveurs OS/owner
- QuickWin: campagnes patching rapide avec exclusions générales (OS/reboot) et spécifiques (applicatifs)
- Config serveurs: pagination, filtres (search, env, domain, zone, per_page), dry run, bulk edit
- Détail campagne: pagination hprod/prod séparée, filtres (search, status, domain), section prod masquée si hprod non terminé
- Auth: redirection qw_only vers /quickwin, profil lecture seule quickwin
- Serveurs: filtres OS (Linux/Windows) et Owner (secops/ipop/na), exclusion EOL
- Sidebar: lien QuickWin conditionné sur permission campaigns ou quickwin

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-08 16:27:45 +02:00

255 lines
11 KiB
Python

"""Service QuickWin — gestion des campagnes + exclusions par serveur"""
import json
from sqlalchemy import text
# Exclusions generales par defaut (reboot packages + middleware/apps)
DEFAULT_GENERAL_EXCLUDES = (
"dbus* dracut* glibc* grub2* kernel* kexec-tools* "
"libselinux* linux-firmware* microcode_ctl* mokutil* "
"net-snmp* NetworkManager* network-scripts* nss* openssl-libs* "
"polkit* selinux-policy* shim* systemd* tuned*"
)
def get_server_configs(db, server_ids=None):
"""Retourne les configs QuickWin pour les serveurs (ou tous)"""
if server_ids:
rows = db.execute(text("""
SELECT qc.*, s.hostname, s.os_family, s.tier,
d.name as domaine, e.name as environnement,
z.name as zone
FROM quickwin_server_config qc
JOIN servers s ON qc.server_id = s.id
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
LEFT JOIN domains d ON de.domain_id = d.id
LEFT JOIN environments e ON de.environment_id = e.id
LEFT JOIN zones z ON s.zone_id = z.id
WHERE qc.server_id = ANY(:ids)
ORDER BY s.hostname
"""), {"ids": server_ids}).fetchall()
else:
rows = db.execute(text("""
SELECT qc.*, s.hostname, s.os_family, s.tier,
d.name as domaine, e.name as environnement,
z.name as zone
FROM quickwin_server_config qc
JOIN servers s ON qc.server_id = s.id
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
LEFT JOIN domains d ON de.domain_id = d.id
LEFT JOIN environments e ON de.environment_id = e.id
LEFT JOIN zones z ON s.zone_id = z.id
ORDER BY s.hostname
""")).fetchall()
return rows
def upsert_server_config(db, server_id, general_excludes=None, specific_excludes="", notes=""):
"""Cree ou met a jour la config QuickWin d'un serveur.
Si general_excludes est vide lors de la creation, applique DEFAULT_GENERAL_EXCLUDES."""
existing = db.execute(text(
"SELECT id FROM quickwin_server_config WHERE server_id = :sid"
), {"sid": server_id}).fetchone()
if existing:
ge = general_excludes if general_excludes is not None else DEFAULT_GENERAL_EXCLUDES
db.execute(text("""
UPDATE quickwin_server_config
SET general_excludes = :ge, specific_excludes = :se, notes = :n, updated_at = now()
WHERE server_id = :sid
"""), {"sid": server_id, "ge": ge, "se": specific_excludes, "n": notes})
else:
ge = general_excludes if general_excludes else DEFAULT_GENERAL_EXCLUDES
db.execute(text("""
INSERT INTO quickwin_server_config (server_id, general_excludes, specific_excludes, notes)
VALUES (:sid, :ge, :se, :n)
"""), {"sid": server_id, "ge": ge, "se": specific_excludes, "n": notes})
db.commit()
def delete_server_config(db, config_id):
db.execute(text("DELETE FROM quickwin_server_config WHERE id = :id"), {"id": config_id})
db.commit()
def get_eligible_servers(db):
"""Serveurs Linux en_production, patch_os_owner=secops"""
return db.execute(text("""
SELECT s.id, s.hostname, s.os_family, s.os_version, s.machine_type,
s.tier, s.etat, s.patch_excludes, s.is_flux_libre, s.is_podman,
d.name as domaine, d.code as domain_code,
e.name as environnement, e.code as env_code,
COALESCE(qc.general_excludes, '') as qw_general_excludes,
COALESCE(qc.specific_excludes, '') as qw_specific_excludes
FROM servers s
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
LEFT JOIN domains d ON de.domain_id = d.id
LEFT JOIN environments e ON de.environment_id = e.id
LEFT JOIN quickwin_server_config qc ON qc.server_id = s.id
WHERE s.os_family = 'linux'
AND s.etat = 'en_production'
AND s.patch_os_owner = 'secops'
ORDER BY e.display_order, d.display_order, s.hostname
""")).fetchall()
# -- Runs --
def list_runs(db):
return db.execute(text("""
SELECT r.*,
u.display_name as created_by_name,
(SELECT COUNT(*) FROM quickwin_entries e WHERE e.run_id = r.id) as total_entries,
(SELECT COUNT(*) FROM quickwin_entries e WHERE e.run_id = r.id AND e.status = 'patched') as patched_count,
(SELECT COUNT(*) FROM quickwin_entries e WHERE e.run_id = r.id AND e.status = 'failed') as failed_count,
(SELECT COUNT(*) FROM quickwin_entries e WHERE e.run_id = r.id AND e.branch = 'hprod') as hprod_count,
(SELECT COUNT(*) FROM quickwin_entries e WHERE e.run_id = r.id AND e.branch = 'prod') as prod_count
FROM quickwin_runs r
LEFT JOIN users u ON r.created_by = u.id
ORDER BY r.year DESC, r.week_number DESC, r.id DESC
""")).fetchall()
def get_run(db, run_id):
return db.execute(text("""
SELECT r.*, u.display_name as created_by_name
FROM quickwin_runs r LEFT JOIN users u ON r.created_by = u.id
WHERE r.id = :id
"""), {"id": run_id}).fetchone()
def get_run_entries(db, run_id):
return db.execute(text("""
SELECT qe.*, s.hostname, s.os_family, s.machine_type,
d.name as domaine, e.name as environnement
FROM quickwin_entries qe
JOIN servers s ON qe.server_id = s.id
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
LEFT JOIN domains d ON de.domain_id = d.id
LEFT JOIN environments e ON de.environment_id = e.id
WHERE qe.run_id = :rid
ORDER BY qe.branch, s.hostname
"""), {"rid": run_id}).fetchall()
def create_run(db, year, week_number, label, user_id, server_ids, notes=""):
"""Cree un run QuickWin avec les serveurs selectionnes.
Classe auto en hprod/prod selon l'environnement du serveur."""
row = db.execute(text("""
INSERT INTO quickwin_runs (year, week_number, label, created_by, notes)
VALUES (:y, :w, :l, :uid, :n) RETURNING id
"""), {"y": year, "w": week_number, "l": label, "uid": user_id, "n": notes}).fetchone()
run_id = row.id
for sid in server_ids:
srv = db.execute(text("""
SELECT s.id, e.name as env_name,
COALESCE(qc.general_excludes, '') as ge,
COALESCE(qc.specific_excludes, '') as se
FROM servers s
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
LEFT JOIN environments e ON de.environment_id = e.id
LEFT JOIN quickwin_server_config qc ON qc.server_id = s.id
WHERE s.id = :sid
"""), {"sid": sid}).fetchone()
if not srv:
continue
branch = "prod" if srv.env_name and "production" in srv.env_name.lower() else "hprod"
ge = srv.ge if srv.ge else DEFAULT_GENERAL_EXCLUDES
db.execute(text("""
INSERT INTO quickwin_entries (run_id, server_id, branch, general_excludes, specific_excludes)
VALUES (:rid, :sid, :br, :ge, :se)
"""), {"rid": run_id, "sid": sid, "br": branch, "ge": ge, "se": srv.se})
db.commit()
return run_id
def delete_run(db, run_id):
db.execute(text("DELETE FROM quickwin_entries WHERE run_id = :rid"), {"rid": run_id})
db.execute(text("DELETE FROM quickwin_runs WHERE id = :rid"), {"rid": run_id})
db.commit()
def update_entry_status(db, entry_id, status, patch_output="", packages_count=0,
packages="", reboot_required=False, notes=""):
db.execute(text("""
UPDATE quickwin_entries SET
status = :st, patch_output = :po, patch_packages_count = :pc,
patch_packages = :pp, reboot_required = :rb, notes = :n,
patch_date = CASE WHEN :st IN ('patched','failed') THEN now() ELSE patch_date END,
updated_at = now()
WHERE id = :id
"""), {"id": entry_id, "st": status, "po": patch_output, "pc": packages_count,
"pp": packages, "rb": reboot_required, "n": notes})
db.commit()
def update_entry_field(db, entry_id, field, value):
"""Mise a jour d'un champ unique (pour inline edit)"""
allowed = ("general_excludes", "specific_excludes", "notes", "status",
"snap_done", "prereq_ok", "prereq_detail", "dryrun_output")
if field not in allowed:
return False
db.execute(text(f"UPDATE quickwin_entries SET {field} = :val, updated_at = now() WHERE id = :id"),
{"val": value, "id": entry_id})
db.commit()
return True
def can_start_prod(db, run_id):
"""Verifie que tous les hprod sont termines avant d'autoriser le prod"""
pending = db.execute(text("""
SELECT COUNT(*) as cnt FROM quickwin_entries
WHERE run_id = :rid AND branch = 'hprod' AND status IN ('pending', 'in_progress')
"""), {"rid": run_id}).fetchone()
return pending.cnt == 0
def get_run_stats(db, run_id):
return db.execute(text("""
SELECT
COUNT(*) as total,
COUNT(*) FILTER (WHERE branch = 'hprod') as hprod_total,
COUNT(*) FILTER (WHERE branch = 'prod') as prod_total,
COUNT(*) FILTER (WHERE status = 'patched') as patched,
COUNT(*) FILTER (WHERE status = 'failed') as failed,
COUNT(*) FILTER (WHERE status = 'pending') as pending,
COUNT(*) FILTER (WHERE status = 'excluded') as excluded,
COUNT(*) FILTER (WHERE status = 'skipped') as skipped,
COUNT(*) FILTER (WHERE branch = 'hprod' AND status = 'patched') as hprod_patched,
COUNT(*) FILTER (WHERE branch = 'prod' AND status = 'patched') as prod_patched,
COUNT(*) FILTER (WHERE reboot_required) as reboot_count
FROM quickwin_entries WHERE run_id = :rid
"""), {"rid": run_id}).fetchone()
def inject_yum_history(db, data):
"""Injecte l'historique yum dans quickwin_server_config.
data = [{"server": "hostname", "yum_commands": [...]}]"""
updated = 0
inserted = 0
for item in data:
hostname = item.get("server", item.get("server_name", "")).strip()
if not hostname:
continue
srv = db.execute(text("SELECT id FROM servers WHERE hostname = :h"), {"h": hostname}).fetchone()
if not srv:
continue
cmds = json.dumps(item.get("yum_commands", item.get("last_yum_commands", [])), ensure_ascii=False)
existing = db.execute(text(
"SELECT id FROM quickwin_server_config WHERE server_id = :sid"
), {"sid": srv.id}).fetchone()
if existing:
db.execute(text("""
UPDATE quickwin_server_config SET last_yum_commands = :cmds::jsonb, updated_at = now()
WHERE server_id = :sid
"""), {"sid": srv.id, "cmds": cmds})
updated += 1
else:
db.execute(text("""
INSERT INTO quickwin_server_config (server_id, last_yum_commands)
VALUES (:sid, :cmds::jsonb)
"""), {"sid": srv.id, "cmds": cmds})
inserted += 1
db.commit()
return updated, inserted