patchcenter/app/routers/patch_history.py

236 lines
10 KiB
Python

"""Router Historique patching — vue unifiee patch_history + quickwin_entries"""
from fastapi import APIRouter, Request, Depends, Query
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
from ..dependencies import get_db, get_current_user
from ..config import APP_NAME
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
@router.get("/patching/historique", response_class=HTMLResponse)
async def patch_history_page(request: Request, db=Depends(get_db),
year: str = Query(""), week: str = Query(""),
hostname: str = Query(""), source: str = Query(""),
os_family: str = Query(""), zone: str = Query(""),
domain: str = Query(""), intervenant: str = Query(""),
page: str = Query("1")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
from datetime import datetime
year = int(year) if year and year.isdigit() else datetime.now().year
week = int(week) if week and week.isdigit() else None
page = int(page) if page and page.isdigit() else 1
hostname = hostname.strip() or None
source = source.strip() or None
os_family = os_family.strip() or None
zone = zone.strip() or None
domain = domain.strip() or None
intervenant = intervenant.strip() or None
per_page = 100
offset = (page - 1) * per_page
kpis = {}
kpis["total_ph"] = db.execute(text(
"SELECT COUNT(*) FROM patch_history WHERE EXTRACT(YEAR FROM date_patch)=:y"
), {"y": year}).scalar()
kpis["total_qw"] = db.execute(text("""
SELECT COUNT(*) FROM quickwin_entries qe
JOIN quickwin_runs qr ON qe.run_id=qr.id
WHERE qe.status='patched' AND qr.year=:y
"""), {"y": year}).scalar()
kpis["total"] = kpis["total_ph"] + kpis["total_qw"]
kpis["servers"] = db.execute(text("""
SELECT COUNT(DISTINCT sid) FROM (
SELECT server_id AS sid FROM patch_history WHERE EXTRACT(YEAR FROM date_patch)=:y
UNION
SELECT qe.server_id FROM quickwin_entries qe
JOIN quickwin_runs qr ON qe.run_id=qr.id
WHERE qe.status='patched' AND qr.year=:y
) u
"""), {"y": year}).scalar()
kpis["patchables"] = db.execute(text(
"SELECT COUNT(*) FROM servers WHERE etat='Production' AND patch_os_owner='secops'"
)).scalar()
kpis["never"] = db.execute(text("""
SELECT COUNT(*) FROM servers s
WHERE s.etat='Production' AND s.patch_os_owner='secops'
AND NOT EXISTS (SELECT 1 FROM patch_history ph
WHERE ph.server_id=s.id AND EXTRACT(YEAR FROM ph.date_patch)=:y)
AND NOT EXISTS (SELECT 1 FROM quickwin_entries qe
JOIN quickwin_runs qr ON qe.run_id=qr.id
WHERE qe.server_id=s.id AND qe.status='patched' AND qr.year=:y)
"""), {"y": year}).scalar()
kpis["coverage_pct"] = round((kpis["servers"] / kpis["patchables"] * 100), 1) if kpis["patchables"] else 0
by_source = {}
by_source["import"] = db.execute(text(
"SELECT COUNT(*) FROM patch_history WHERE campaign_id IS NULL AND EXTRACT(YEAR FROM date_patch)=:y"
), {"y": year}).scalar()
by_source["standard"] = db.execute(text("""
SELECT COUNT(*) FROM patch_history ph
JOIN campaigns c ON ph.campaign_id=c.id
WHERE c.campaign_type='standard' AND EXTRACT(YEAR FROM ph.date_patch)=:y
"""), {"y": year}).scalar()
by_source["quickwin"] = kpis["total_qw"]
by_week = db.execute(text("""
SELECT week_num, SUM(cnt)::int as servers FROM (
SELECT TO_CHAR(date_patch, 'IW') as week_num, COUNT(DISTINCT server_id) as cnt
FROM patch_history
WHERE EXTRACT(YEAR FROM date_patch)=:y
GROUP BY TO_CHAR(date_patch, 'IW')
UNION ALL
SELECT LPAD(qr.week_number::text, 2, '0') as week_num, COUNT(DISTINCT qe.server_id) as cnt
FROM quickwin_entries qe
JOIN quickwin_runs qr ON qe.run_id=qr.id
WHERE qe.status='patched' AND qr.year=:y
GROUP BY qr.week_number
) u GROUP BY week_num ORDER BY week_num
"""), {"y": year}).fetchall()
# Listes pour les filtres (selon annee courante)
filter_opts = {}
filter_opts["os"] = [r.os for r in db.execute(text("""
SELECT DISTINCT s.os_family as os FROM servers s
WHERE s.os_family IS NOT NULL AND s.os_family <> ''
ORDER BY 1
""")).fetchall()]
filter_opts["zones"] = [r.zone for r in db.execute(text("""
SELECT DISTINCT z.name as zone FROM zones z ORDER BY 1
""")).fetchall()]
filter_opts["domains"] = [r.dom for r in db.execute(text("""
SELECT DISTINCT d.name as dom FROM domains d ORDER BY 1
""")).fetchall()]
filter_opts["intervenants"] = [r.interv for r in db.execute(text("""
SELECT DISTINCT intervenant_name as interv FROM patch_history
WHERE intervenant_name IS NOT NULL AND intervenant_name <> ''
ORDER BY 1
""")).fetchall()]
where_ph = ["EXTRACT(YEAR FROM ph.date_patch)=:y"]
where_qw = ["qr.year=:y", "qe.status='patched'"]
params = {"y": year, "limit": per_page, "offset": offset}
if week:
where_ph.append("EXTRACT(WEEK FROM ph.date_patch)=:wk")
where_qw.append("qr.week_number=:wk")
params["wk"] = week
if hostname:
where_ph.append("s.hostname ILIKE :h")
where_qw.append("s.hostname ILIKE :h")
params["h"] = f"%{hostname}%"
if os_family:
where_ph.append("s.os_family=:os")
where_qw.append("s.os_family=:os")
params["os"] = os_family
if zone:
where_ph.append("z.name=:zn")
where_qw.append("z.name=:zn")
params["zn"] = zone
if domain:
where_ph.append("d.name=:dm")
where_qw.append("d.name=:dm")
params["dm"] = domain
if intervenant:
where_ph.append("ph.intervenant_name=:iv")
where_qw.append("1=0") # quickwin n'a pas ce champ
params["iv"] = intervenant
if source == "import":
where_ph.append("ph.campaign_id IS NULL")
elif source == "standard":
where_ph.append("c.campaign_type='standard'")
wc_ph = " AND ".join(where_ph)
wc_qw = " AND ".join(where_qw)
skip_qw = source in ("import", "standard") or bool(intervenant)
skip_ph = source == "quickwin"
ph_joins = """
JOIN servers s ON ph.server_id=s.id
LEFT JOIN zones z ON s.zone_id=z.id
LEFT JOIN domain_environments de ON s.domain_env_id=de.id
LEFT JOIN domains d ON de.domain_id=d.id
LEFT JOIN campaigns c ON ph.campaign_id=c.id
"""
qw_joins = """
JOIN quickwin_runs qr ON qe.run_id=qr.id
JOIN servers s ON qe.server_id=s.id
LEFT JOIN zones z ON s.zone_id=z.id
LEFT JOIN domain_environments de ON s.domain_env_id=de.id
LEFT JOIN domains d ON de.domain_id=d.id
"""
count_parts = []
if not skip_ph:
count_parts.append(f"SELECT COUNT(*) FROM patch_history ph {ph_joins} WHERE {wc_ph}")
if not skip_qw:
count_parts.append(f"SELECT COUNT(*) FROM quickwin_entries qe {qw_joins} WHERE {wc_qw}")
count_sql = " + ".join(f"({p})" for p in count_parts) if count_parts else "0"
total_filtered = db.execute(text(f"SELECT {count_sql}"), params).scalar()
union_parts = []
if not skip_ph:
union_parts.append(f"""
SELECT s.id as sid, s.hostname, s.os_family, s.etat,
ph.date_patch, ph.status, ph.notes, ph.intervenant_name,
z.name as zone, d.name as domain_name,
CASE WHEN ph.campaign_id IS NULL THEN 'import'
ELSE COALESCE(c.campaign_type, 'standard') END as source_type,
c.id as campaign_id, c.label as campaign_label,
NULL::int as run_id, NULL::text as run_label
FROM patch_history ph {ph_joins}
WHERE {wc_ph}
""")
if not skip_qw:
union_parts.append(f"""
SELECT s.id as sid, s.hostname, s.os_family, s.etat,
qe.patch_date as date_patch, qe.status, qe.notes,
NULL::text as intervenant_name,
z.name as zone, d.name as domain_name,
'quickwin' as source_type,
NULL::int as campaign_id, NULL::text as campaign_label,
qr.id as run_id, qr.label as run_label
FROM quickwin_entries qe {qw_joins}
WHERE {wc_qw}
""")
if not union_parts:
union_parts.append("""SELECT NULL::int as sid, NULL as hostname, NULL as os_family, NULL as etat,
NULL::timestamptz as date_patch, NULL as status, NULL as notes, NULL as intervenant_name,
NULL as zone, NULL as domain_name, NULL as source_type,
NULL::int as campaign_id, NULL as campaign_label, NULL::int as run_id, NULL as run_label
WHERE 1=0""")
union_sql = " UNION ALL ".join(union_parts)
rows = db.execute(text(f"""
SELECT * FROM ({union_sql}) combined
ORDER BY date_patch DESC NULLS LAST
LIMIT :limit OFFSET :offset
"""), params).fetchall()
years = db.execute(text("""
SELECT DISTINCT y FROM (
SELECT EXTRACT(YEAR FROM date_patch)::int as y FROM patch_history
UNION
SELECT year as y FROM quickwin_runs
) u ORDER BY y DESC
""")).fetchall()
return templates.TemplateResponse("patch_history.html", {
"request": request, "user": user, "app_name": APP_NAME,
"kpis": kpis, "by_week": by_week, "by_source": by_source,
"rows": rows, "year": year, "week": week, "hostname": hostname,
"source": source, "os_family": os_family, "zone": zone,
"domain": domain, "intervenant": intervenant,
"filter_opts": filter_opts, "page": page, "per_page": per_page,
"total_filtered": total_filtered, "years": [y.y for y in years],
})