feat(qualys): dashboard vulnerabilites avec KPI + historique

This commit is contained in:
Pierre & Lumière 2026-04-24 23:49:46 +00:00
parent b06aedfc3b
commit 8f8e8c4d8f
6 changed files with 819 additions and 217 deletions

View File

@ -466,13 +466,21 @@ async def qualys_search(request: Request, db=Depends(get_db),
all_tags = db.execute(text("SELECT qualys_tag_id, name FROM qualys_tags ORDER BY name")).fetchall() all_tags = db.execute(text("SELECT qualys_tag_id, name FROM qualys_tags ORDER BY name")).fetchall()
# KPI : total / avec vuln / sans vuln + filtrage vuln_filter (with|zero) # KPI : total / avec vuln / sans vuln + filtrage vuln_filter (with|zero|critical|high|medium)
vuln_filter = request.query_params.get("vuln_filter", "") vuln_filter = request.query_params.get("vuln_filter", "")
def _vuln_total(a): def _vuln_total(a):
vc = vuln_map.get(str(a.ip_address), {}) vc = vuln_map.get(str(a.ip_address), {})
if isinstance(vc, dict): if isinstance(vc, dict):
return int(vc.get("total", 0) or 0) return int(vc.get("total", 0) or 0)
return int(vc or 0) return int(vc or 0)
def _max_sev(a):
vc = vuln_map.get(str(a.ip_address), {})
if not isinstance(vc, dict):
return 0
if vc.get("severity5", 0) > 0: return 5
if vc.get("severity4", 0) > 0: return 4
if vc.get("severity3", 0) > 0: return 3
return 0
kpi_total = len(assets) if assets else 0 kpi_total = len(assets) if assets else 0
kpi_with_vuln = sum(1 for a in assets if _vuln_total(a) > 0) if assets else 0 kpi_with_vuln = sum(1 for a in assets if _vuln_total(a) > 0) if assets else 0
kpi_zero_vuln = kpi_total - kpi_with_vuln kpi_zero_vuln = kpi_total - kpi_with_vuln
@ -480,6 +488,12 @@ async def qualys_search(request: Request, db=Depends(get_db),
assets = [a for a in assets if _vuln_total(a) > 0] assets = [a for a in assets if _vuln_total(a) > 0]
elif assets and vuln_filter == "zero": elif assets and vuln_filter == "zero":
assets = [a for a in assets if _vuln_total(a) == 0] assets = [a for a in assets if _vuln_total(a) == 0]
elif assets and vuln_filter == "critical":
assets = [a for a in assets if _max_sev(a) == 5]
elif assets and vuln_filter == "high":
assets = [a for a in assets if _max_sev(a) == 4]
elif assets and vuln_filter == "medium":
assets = [a for a in assets if _max_sev(a) == 3]
ctx = base_context(request, db, user) ctx = base_context(request, db, user)
ctx.update({ ctx.update({
@ -1159,3 +1173,81 @@ async def qualys_deploy_check(request: Request, db=Depends(get_db)):
"active": sum(1 for r in results if r["status"] == "ACTIVE"), "active": sum(1 for r in results if r["status"] == "ACTIVE"),
"not_installed": sum(1 for r in results if r["status"] == "NOT_INSTALLED"), "not_installed": sum(1 for r in results if r["status"] == "NOT_INSTALLED"),
"failed": sum(1 for r in results if r["status"] == "CONNECTION_FAILED")}) "failed": sum(1 for r in results if r["status"] == "CONNECTION_FAILED")})
# === DASHBOARD VULNERABILITES (KPI + historique) ===
@router.get("/qualys/dashboard", response_class=HTMLResponse)
async def qualys_dashboard(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
# TODO: ajouter perm specifique 'qualys_dashboard_view' (admin + DSI)
if not can_view(perms, "qualys"):
return RedirectResponse(url="/dashboard")
from app.services.qualys_service import load_vuln_dashboard, is_dashboard_running
data = load_vuln_dashboard(db)
env_set = sorted({d["name"] for d in data["env"]})
pos_set = sorted({d["name"] for d in data["pos"]})
matrix = {(c["name"], c["name2"]): c for c in data["env_pos"]}
ctx = base_context(request, db, user)
ctx.update({"data": data, "env_list": env_set, "pos_list": pos_set,
"matrix": matrix, "is_running": is_dashboard_running()})
return templates.TemplateResponse("qualys_dashboard.html", ctx)
@router.post("/qualys/dashboard/refresh")
async def qualys_dashboard_refresh(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "qualys"):
return RedirectResponse(url="/qualys/dashboard")
from app.services.qualys_service import compute_vuln_dashboard, is_dashboard_running
if is_dashboard_running():
return RedirectResponse(url="/qualys/dashboard?msg=already_running", status_code=303)
import threading
def _runner():
from app.database import SessionLocal
s = SessionLocal()
try:
compute_vuln_dashboard(s, triggered_by=f"manual:{user.username}")
finally:
s.close()
threading.Thread(target=_runner, daemon=True).start()
return RedirectResponse(url="/qualys/dashboard?msg=refresh_started", status_code=303)
@router.get("/qualys/dashboard/history", response_class=HTMLResponse)
async def qualys_dashboard_history(request: Request, db=Depends(get_db),
period: str = "day", days: int = 30,
dimension: str = "global", dim_value: str = "all"):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "qualys"):
return RedirectResponse(url="/dashboard")
from app.services.qualys_service import load_vuln_history
if period not in ("day", "week", "month"):
period = "day"
days = max(7, min(365, days))
series = load_vuln_history(db, period=period, days=days,
dimension=dimension, dimension_value=dim_value)
# Liste des dim/value disponibles dans les snapshots pour le selecteur
dim_options = db.execute(text("""SELECT DISTINCT dimension, dimension_value
FROM qualys_vuln_snapshot WHERE dimension IN ('global','env','pos','os')
ORDER BY dimension, dimension_value""")).fetchall()
runs_count = db.execute(text("SELECT COUNT(*) FROM qualys_vuln_snapshot_run WHERE status='ok'")).scalar()
ctx = base_context(request, db, user)
ctx.update({"series": series, "period": period, "days": days,
"dimension": dimension, "dim_value": dim_value,
"dim_options": dim_options, "runs_count": runs_count})
return templates.TemplateResponse("qualys_dashboard_history.html", ctx)

View File

@ -855,3 +855,211 @@ def cancel_refresh():
def is_refresh_running(): def is_refresh_running():
return _refresh_running return _refresh_running
# ===========================================================================
# DASHBOARD VULNERABILITES — agregation par dimension + historique
# Tables: qualys_vuln_snapshot_run (un row par calcul) + qualys_vuln_snapshot
# Dimensions: global, env, pos, os, domain, env_pos
# Niveaux: critical (sev5), high (sev4), medium (sev3), sain, non_scanne
# Un serveur est compte dans son niveau MAX (1 seule colonne)
# ===========================================================================
_dashboard_running = False
def is_dashboard_running():
return _dashboard_running
def _classify_severity(vc):
if not isinstance(vc, dict):
return "sain"
if vc.get("severity5", 0) > 0:
return "critical"
if vc.get("severity4", 0) > 0:
return "high"
if vc.get("severity3", 0) > 0:
return "medium"
return "sain"
def _is_scanned(asset_row, has_vuln_data):
status = (asset_row.agent_status or "").lower()
if status in ("active", "ok", "status_active"):
return True
if has_vuln_data:
return True
return False
def compute_vuln_dashboard(db, triggered_by="manual"):
"""Calcule un nouveau snapshot (insert run + snapshot rows).
Retourne dict {ok, msg, run_id, asset_count, duration_sec}."""
global _dashboard_running
if _dashboard_running:
return {"ok": False, "msg": "Calcul deja en cours", "run_id": None,
"asset_count": 0, "duration_sec": 0}
_dashboard_running = True
import time
t0 = time.time()
run_id = None
try:
# 1. Creer le run en pending
run_id = db.execute(text("""
INSERT INTO qualys_vuln_snapshot_run (status, triggered_by)
VALUES ('pending', :tb) RETURNING id
"""), {"tb": triggered_by}).scalar()
db.commit()
# 2. Charger tous les assets avec leurs tags + domaine AD
rows = db.execute(text("""
SELECT qa.qualys_asset_id, qa.hostname, qa.ip_address, qa.agent_status,
qa.last_checkin, qa.os_family, qa.server_id,
COALESCE(string_agg(DISTINCT qt.name, '|' ORDER BY qt.name), '') as tag_names,
s.domain_ltd
FROM qualys_assets qa
LEFT JOIN qualys_asset_tags qat ON qat.qualys_asset_id = qa.qualys_asset_id
LEFT JOIN qualys_tags qt ON qt.qualys_tag_id = qat.qualys_tag_id
LEFT JOIN servers s ON s.id = qa.server_id
GROUP BY qa.qualys_asset_id, qa.hostname, qa.ip_address, qa.agent_status,
qa.last_checkin, qa.os_family, qa.server_id, s.domain_ltd
""")).fetchall()
asset_count = len(rows)
# 3. Recuperer vulns par batch de 50 IPs (cache 10min via get_vuln_counts)
ip_to_vuln = {}
unique_ips = list({str(r.ip_address) for r in rows
if r.ip_address and str(r.ip_address) != "None"})
for i in range(0, len(unique_ips), 50):
batch = unique_ips[i:i+50]
try:
vmap = get_vuln_counts(db, ",".join(batch))
if vmap:
ip_to_vuln.update(vmap)
except Exception:
pass
# 4. Classifier + agreger
agg = {}
def _bump(dim, val, val2, level):
key = (dim, val or "(none)", val2 or "")
if key not in agg:
agg[key] = {"total": 0, "critical": 0, "high": 0, "medium": 0,
"sain": 0, "non_scanne": 0}
agg[key]["total"] += 1
agg[key][level] += 1
for r in rows:
ip = str(r.ip_address) if r.ip_address else None
vc = ip_to_vuln.get(ip) if ip else None
scanned = _is_scanned(r, vc is not None)
level = "non_scanne" if not scanned else _classify_severity(vc or {})
tags = (r.tag_names or "").split("|") if r.tag_names else []
envs = [t for t in tags if t.startswith("ENV-")]
poses = [t for t in tags if t.startswith("POS-")]
oses = [t for t in tags if t.startswith("OS-")]
dom = r.domain_ltd or "(sans domaine)"
_bump("global", "all", "", level)
for e in envs or ["(sans env)"]:
_bump("env", e, "", level)
for p in poses or ["(sans pos)"]:
_bump("pos", p, "", level)
for o in oses or ["(sans os)"]:
_bump("os", o, "", level)
_bump("domain", dom, "", level)
for e in envs or ["(sans env)"]:
for p in poses or ["(sans pos)"]:
_bump("env_pos", e, p, level)
# 5. Persister les agregats
for (dim, val, val2), counts in agg.items():
db.execute(text("""
INSERT INTO qualys_vuln_snapshot
(run_id, dimension, dimension_value, dimension_value2,
total, critical, high, medium, sain, non_scanne)
VALUES (:rid, :dim, :v1, :v2, :tot, :c, :h, :m, :s, :ns)
"""), {"rid": run_id, "dim": dim, "v1": val, "v2": val2 or None,
"tot": counts["total"], "c": counts["critical"],
"h": counts["high"], "m": counts["medium"],
"s": counts["sain"], "ns": counts["non_scanne"]})
duration = int(time.time() - t0)
db.execute(text("""UPDATE qualys_vuln_snapshot_run
SET status='ok', asset_count=:ac, duration_sec=:d, msg='OK' WHERE id=:rid"""),
{"ac": asset_count, "d": duration, "rid": run_id})
db.commit()
return {"ok": True, "msg": "OK", "run_id": run_id,
"asset_count": asset_count, "duration_sec": duration}
except Exception as ex:
db.rollback()
if run_id:
try:
db.execute(text("""UPDATE qualys_vuln_snapshot_run
SET status='error', msg=:m, duration_sec=:d WHERE id=:rid"""),
{"m": str(ex)[:500], "d": int(time.time() - t0), "rid": run_id})
db.commit()
except Exception:
pass
return {"ok": False, "msg": str(ex), "run_id": run_id,
"asset_count": 0, "duration_sec": int(time.time() - t0)}
finally:
_dashboard_running = False
def load_vuln_dashboard(db):
"""Charge le dernier run reussi + ses snapshots."""
last_run = db.execute(text("""SELECT id, run_at, asset_count, duration_sec, triggered_by
FROM qualys_vuln_snapshot_run WHERE status='ok' ORDER BY run_at DESC LIMIT 1""")).fetchone()
if not last_run:
return {"global": None, "env": [], "pos": [], "os": [], "domain": [],
"env_pos": [], "last_run": None}
rows = db.execute(text("""SELECT dimension, dimension_value, dimension_value2,
total, critical, high, medium, sain, non_scanne
FROM qualys_vuln_snapshot WHERE run_id=:rid
ORDER BY dimension, dimension_value, dimension_value2"""), {"rid": last_run.id}).fetchall()
out = {"global": None, "env": [], "pos": [], "os": [], "domain": [],
"env_pos": [], "last_run": last_run}
for r in rows:
d = {"name": r.dimension_value, "name2": r.dimension_value2,
"total": r.total, "critical": r.critical, "high": r.high,
"medium": r.medium, "sain": r.sain, "non_scanne": r.non_scanne,
"vuln_total": r.critical + r.high + r.medium,
"scanned": r.total - r.non_scanne}
d["pct_vuln"] = round(100 * d["vuln_total"] / d["scanned"], 1) if d["scanned"] > 0 else 0
if r.dimension == "global":
out["global"] = d
elif r.dimension in out:
out[r.dimension].append(d)
return out
def load_vuln_history(db, period="day", days=30, dimension="global", dimension_value="all"):
"""Retourne l'evolution dans le temps.
period: 'day' (1 point/jour), 'week' (lundi), 'month' (1er du mois)
dimension/dimension_value: filtre (defaut: global/all)
Renvoie list de dicts {bucket, total, critical, high, medium, sain, non_scanne}"""
if period == "month":
bucket_expr = "date_trunc('month', r.run_at)"
elif period == "week":
bucket_expr = "date_trunc('week', r.run_at)"
else:
bucket_expr = "date_trunc('day', r.run_at)"
rows = db.execute(text(f"""
SELECT {bucket_expr} as bucket,
s.total, s.critical, s.high, s.medium, s.sain, s.non_scanne,
r.run_at
FROM qualys_vuln_snapshot s
JOIN qualys_vuln_snapshot_run r ON r.id = s.run_id
WHERE r.status='ok' AND r.run_at >= now() - (:days || ' days')::interval
AND s.dimension = :dim AND s.dimension_value = :dv
ORDER BY r.run_at
"""), {"days": days, "dim": dimension, "dv": dimension_value}).fetchall()
# Garder le dernier snapshot de chaque bucket
by_bucket = {}
for r in rows:
by_bucket[r.bucket] = r
return [{"bucket": b.isoformat(), "total": r.total, "critical": r.critical,
"high": r.high, "medium": r.medium, "sain": r.sain,
"non_scanne": r.non_scanne, "vuln_total": r.critical + r.high + r.medium}
for b, r in sorted(by_bucket.items())]

View File

@ -133,6 +133,7 @@
<span x-text="open === 'qualys' ? '▾' : '▸'" class="text-xs"></span> <span x-text="open === 'qualys' ? '▾' : '▸'" class="text-xs"></span>
</button> </button>
<div x-show="open === 'qualys'" x-cloak class="space-y-1 pl-1"> <div x-show="open === 'qualys'" x-cloak class="space-y-1 pl-1">
<a href="/qualys/dashboard" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if '/qualys/dashboard' in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">📊 Dashboard Vulns</a>
<a href="/qualys/search" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if '/qualys/search' in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Recherche</a> <a href="/qualys/search" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if '/qualys/search' in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Recherche</a>
<a href="/qualys/tags" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if '/qualys/tags' in path and '/qualys/tagsv3' not in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Tags</a> <a href="/qualys/tags" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if '/qualys/tags' in path and '/qualys/tagsv3' not in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Tags</a>
<a href="/qualys/tagsv3" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if '/qualys/tagsv3' in path and '/catalog' not in path and '/gap' not in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-8">↳ Tags V3 (vue)</a> <a href="/qualys/tagsv3" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if '/qualys/tagsv3' in path and '/catalog' not in path and '/gap' not in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-8">↳ Tags V3 (vue)</a>

View File

@ -0,0 +1,163 @@
{% extends 'base.html' %}
{% block title %}Dashboard Vulnérabilités{% endblock %}
{% block content %}
{% set msg = request.query_params.get('msg', '') %}
{% if msg == 'refresh_started' %}
<div class="card p-3 mb-4 bg-blue-900/20 border-blue-500/40 text-blue-300 text-sm">
Recalcul lancé en arrière-plan. Rafraîchis la page dans 1-2 minutes pour voir les nouveaux chiffres.
</div>
{% elif msg == 'already_running' %}
<div class="card p-3 mb-4 bg-yellow-900/20 border-yellow-500/40 text-yellow-300 text-sm">
Un calcul est déjà en cours. Patiente puis rafraîchis.
</div>
{% endif %}
<div class="flex justify-between items-center mb-4">
<h2 class="text-xl font-bold text-cyber-accent">Dashboard Vulnérabilités</h2>
<div class="flex gap-2 items-center">
{% if data.last_run %}
<span class="text-xs text-gray-500">
Dernier calcul : {{ data.last_run.run_at.strftime('%Y-%m-%d %H:%M') }}
({{ data.last_run.asset_count }} assets, {{ data.last_run.duration_sec }}s)
</span>
{% endif %}
<a href="/qualys/dashboard/history" class="btn-sm bg-cyber-border text-cyber-accent">📈 Historique</a>
<form method="POST" action="/qualys/dashboard/refresh" style="display:inline">
<button class="btn-sm bg-cyber-accent text-black" {% if is_running %}disabled{% endif %}
data-loading="Recalcul en cours...|Calcul des KPI sur tous les assets">
{% if is_running %}⏳ En cours...{% else %}🔄 Recalculer{% endif %}
</button>
</form>
</div>
</div>
{% if not data.last_run %}
<div class="card p-6 text-center">
<p class="text-gray-400 mb-4">Aucun snapshot disponible. Lance un premier calcul.</p>
<form method="POST" action="/qualys/dashboard/refresh">
<button class="btn-primary px-6 py-2">🔄 Lancer le premier calcul</button>
</form>
</div>
{% else %}
<!-- KPI bandeau (6 cards) -->
{% set g = data.global %}
<div class="grid grid-cols-6 gap-2 mb-4">
<div class="card p-3 border-cyber-accent">
<div class="text-xs text-gray-500 uppercase">Total</div>
<div class="text-2xl font-bold text-cyber-accent">{{ g.total if g else 0 }}</div>
<div class="text-xs text-gray-500">serveurs</div>
</div>
<a href="/qualys/search?vuln_filter=critical" class="card p-3 border-red-700 hover:border-red-500 transition">
<div class="text-xs text-gray-500 uppercase">🔴 Critique (sev 5)</div>
<div class="text-2xl font-bold text-red-500">{{ g.critical if g else 0 }}</div>
<div class="text-xs text-gray-500">{{ ((g.critical * 100 / g.scanned) | round(1)) if g and g.scanned > 0 else 0 }}%</div>
</a>
<a href="/qualys/search?vuln_filter=high" class="card p-3 border-orange-700 hover:border-orange-500 transition">
<div class="text-xs text-gray-500 uppercase">🟠 High (sev 4)</div>
<div class="text-2xl font-bold text-orange-500">{{ g.high if g else 0 }}</div>
<div class="text-xs text-gray-500">{{ ((g.high * 100 / g.scanned) | round(1)) if g and g.scanned > 0 else 0 }}%</div>
</a>
<a href="/qualys/search?vuln_filter=medium" class="card p-3 border-yellow-700 hover:border-yellow-500 transition">
<div class="text-xs text-gray-500 uppercase">🟡 Medium (sev 3)</div>
<div class="text-2xl font-bold text-yellow-500">{{ g.medium if g else 0 }}</div>
<div class="text-xs text-gray-500">{{ ((g.medium * 100 / g.scanned) | round(1)) if g and g.scanned > 0 else 0 }}%</div>
</a>
<a href="/qualys/search?vuln_filter=zero" class="card p-3 border-green-700 hover:border-green-500 transition">
<div class="text-xs text-gray-500 uppercase">🟢 Sans vuln</div>
<div class="text-2xl font-bold text-green-500">{{ g.sain if g else 0 }}</div>
<div class="text-xs text-gray-500">{{ ((g.sain * 100 / g.scanned) | round(1)) if g and g.scanned > 0 else 0 }}%</div>
</a>
<div class="card p-3 border-gray-700">
<div class="text-xs text-gray-500 uppercase">⚫ Non scanné</div>
<div class="text-2xl font-bold text-gray-400">{{ g.non_scanne if g else 0 }}</div>
<div class="text-xs text-gray-500">{{ ((g.non_scanne * 100 / g.total) | round(1)) if g and g.total > 0 else 0 }}% du total</div>
</div>
</div>
{% macro pivot_table(title, items, tag_prefix='') %}
<div class="card p-3 mb-4">
<h3 class="text-sm font-bold text-cyber-accent mb-2">{{ title }}</h3>
<table class="w-full text-xs">
<thead class="text-gray-400 border-b border-cyber-border">
<tr>
<th class="text-left py-1">Catégorie</th>
<th class="text-right py-1">Total</th>
<th class="text-right py-1 text-red-400">🔴 Critique</th>
<th class="text-right py-1 text-orange-400">🟠 High</th>
<th class="text-right py-1 text-yellow-400">🟡 Medium</th>
<th class="text-right py-1 text-green-400">🟢 Sain</th>
<th class="text-right py-1 text-gray-500">⚫ Non scanné</th>
<th class="text-right py-1">% vuln</th>
</tr>
</thead>
<tbody>
{% for it in items|sort(attribute='vuln_total', reverse=true) %}
<tr class="border-b border-cyber-border/30 hover:bg-cyber-card/50">
<td class="py-1 font-mono">{{ it.name }}</td>
<td class="text-right py-1">{{ it.total }}</td>
<td class="text-right py-1">
{% if it.critical > 0 %}<a href="/qualys/search?field=tag&search={{ it.name }}&vuln_filter=critical" class="text-red-400 hover:underline">{{ it.critical }}</a>{% else %}-{% endif %}
</td>
<td class="text-right py-1">
{% if it.high > 0 %}<a href="/qualys/search?field=tag&search={{ it.name }}&vuln_filter=high" class="text-orange-400 hover:underline">{{ it.high }}</a>{% else %}-{% endif %}
</td>
<td class="text-right py-1">
{% if it.medium > 0 %}<a href="/qualys/search?field=tag&search={{ it.name }}&vuln_filter=medium" class="text-yellow-400 hover:underline">{{ it.medium }}</a>{% else %}-{% endif %}
</td>
<td class="text-right py-1">
{% if it.sain > 0 %}<a href="/qualys/search?field=tag&search={{ it.name }}&vuln_filter=zero" class="text-green-400 hover:underline">{{ it.sain }}</a>{% else %}-{% endif %}
</td>
<td class="text-right py-1 text-gray-500">{{ it.non_scanne if it.non_scanne > 0 else '-' }}</td>
<td class="text-right py-1 font-bold {% if it.pct_vuln >= 50 %}text-red-400{% elif it.pct_vuln >= 25 %}text-orange-400{% else %}text-green-400{% endif %}">
{{ it.pct_vuln }}%
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endmacro %}
{{ pivot_table('Par environnement (ENV-*)', data.env, 'ENV-') }}
{{ pivot_table('Par position réseau (POS-*)', data.pos, 'POS-') }}
{{ pivot_table('Par OS (OS-*)', data.os, 'OS-') }}
{{ pivot_table('Par domaine AD', data.domain, '') }}
<!-- Matrice ENV x POS heatmap -->
<div class="card p-3 mb-4">
<h3 class="text-sm font-bold text-cyber-accent mb-2">Matrice Environnement × Position (% vuln)</h3>
<table class="w-full text-xs">
<thead>
<tr>
<th class="text-left py-1 text-gray-400">ENV \ POS</th>
{% for p in pos_list %}<th class="text-center py-1 text-gray-400 font-mono">{{ p }}</th>{% endfor %}
</tr>
</thead>
<tbody>
{% for e in env_list %}
<tr>
<td class="py-1 font-mono text-cyber-accent">{{ e }}</td>
{% for p in pos_list %}
{% set cell = matrix.get((e,p)) %}
{% if cell and cell.total > 0 %}
<td class="text-center py-1 px-1 {% if cell.pct_vuln >= 50 %}bg-red-900/40{% elif cell.pct_vuln >= 25 %}bg-orange-900/30{% elif cell.pct_vuln > 0 %}bg-yellow-900/20{% else %}bg-green-900/20{% endif %}">
<a href="/qualys/search?field=tag&search={{ e }}" class="hover:underline">
<div class="font-bold">{{ cell.vuln_total }}/{{ cell.scanned }}</div>
<div class="text-[10px] text-gray-500">{{ cell.pct_vuln }}%</div>
</a>
</td>
{% else %}
<td class="text-center py-1 px-1 text-gray-700">-</td>
{% endif %}
{% endfor %}
</tr>
{% endfor %}
</tbody>
</table>
<p class="text-xs text-gray-500 mt-2">Format : <strong>vulnérables / scannés</strong> — couleur = % vulnérables (rouge ≥50%, orange ≥25%, jaune &gt;0)</p>
</div>
{% endif %}
{% endblock %}

View File

@ -0,0 +1,124 @@
{% extends 'base.html' %}
{% block title %}Historique Vulnérabilités{% endblock %}
{% block content %}
<div class="flex justify-between items-center mb-4">
<h2 class="text-xl font-bold text-cyber-accent">Historique Vulnérabilités</h2>
<a href="/qualys/dashboard" class="btn-sm bg-cyber-border text-cyber-accent">← Dashboard</a>
</div>
<form method="GET" class="card p-3 mb-4 flex gap-3 items-end flex-wrap">
<div>
<label class="text-xs text-gray-500">Granularité</label>
<select name="period" class="text-xs py-1 px-2">
<option value="day" {% if period == 'day' %}selected{% endif %}>Journalier</option>
<option value="week" {% if period == 'week' %}selected{% endif %}>Hebdomadaire (lundi)</option>
<option value="month" {% if period == 'month' %}selected{% endif %}>Mensuel</option>
</select>
</div>
<div>
<label class="text-xs text-gray-500">Période (jours)</label>
<select name="days" class="text-xs py-1 px-2">
<option value="7" {% if days == 7 %}selected{% endif %}>7 jours</option>
<option value="30" {% if days == 30 %}selected{% endif %}>30 jours</option>
<option value="90" {% if days == 90 %}selected{% endif %}>90 jours</option>
<option value="180" {% if days == 180 %}selected{% endif %}>180 jours</option>
<option value="365" {% if days == 365 %}selected{% endif %}>365 jours</option>
</select>
</div>
<div>
<label class="text-xs text-gray-500">Périmètre</label>
<select name="dim_value" class="text-xs py-1 px-2" onchange="
var v = this.value;
var parts = v.split('|');
document.getElementById('dim-input').value = parts[0];
document.getElementById('dimv-input').value = parts[1];
">
<option value="global|all" {% if dimension == 'global' %}selected{% endif %}>Global (tous serveurs)</option>
{% for o in dim_options %}
{% if o.dimension != 'global' %}
<option value="{{ o.dimension }}|{{ o.dimension_value }}"
{% if dimension == o.dimension and dim_value == o.dimension_value %}selected{% endif %}>
{{ o.dimension|upper }} : {{ o.dimension_value }}
</option>
{% endif %}
{% endfor %}
</select>
<input type="hidden" name="dimension" id="dim-input" value="{{ dimension }}">
<input type="hidden" name="dim_value" id="dimv-input" value="{{ dim_value }}">
</div>
<button type="submit" class="btn-primary px-4 py-1 text-sm">Afficher</button>
<span class="text-xs text-gray-500 ml-auto">{{ runs_count }} snapshot(s) total en base</span>
</form>
{% if not series %}
<div class="card p-6 text-center text-gray-400">
Aucune donnée historique pour ce périmètre/période. Lance des snapshots quotidiens pour alimenter.
</div>
{% else %}
<div class="card p-3 mb-4">
<canvas id="chart" style="max-height:400px"></canvas>
</div>
<div class="card p-3">
<h3 class="text-sm font-bold text-cyber-accent mb-2">Données ({{ series|length }} points)</h3>
<table class="w-full text-xs">
<thead class="text-gray-400 border-b border-cyber-border">
<tr>
<th class="text-left py-1">Date</th>
<th class="text-right py-1">Total</th>
<th class="text-right py-1 text-red-400">🔴 Critique</th>
<th class="text-right py-1 text-orange-400">🟠 High</th>
<th class="text-right py-1 text-yellow-400">🟡 Medium</th>
<th class="text-right py-1 text-green-400">🟢 Sain</th>
<th class="text-right py-1 text-gray-500">⚫ Non scanné</th>
<th class="text-right py-1">Total vuln</th>
</tr>
</thead>
<tbody>
{% for p in series|reverse %}
<tr class="border-b border-cyber-border/30">
<td class="py-1 font-mono">{{ p.bucket[:10] }}</td>
<td class="text-right">{{ p.total }}</td>
<td class="text-right text-red-400">{{ p.critical }}</td>
<td class="text-right text-orange-400">{{ p.high }}</td>
<td class="text-right text-yellow-400">{{ p.medium }}</td>
<td class="text-right text-green-400">{{ p.sain }}</td>
<td class="text-right text-gray-500">{{ p.non_scanne }}</td>
<td class="text-right font-bold">{{ p.vuln_total }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
<script src="https://cdn.jsdelivr.net/npm/chart.js@4"></script>
<script>
const data = {{ series|tojson }};
const labels = data.map(p => p.bucket.substring(0,10));
new Chart(document.getElementById('chart'), {
type: 'line',
data: {
labels: labels,
datasets: [
{label: '🔴 Critique', data: data.map(p => p.critical), borderColor: '#ef4444', backgroundColor: 'rgba(239,68,68,.1)', tension: .2, fill: true},
{label: '🟠 High', data: data.map(p => p.high), borderColor: '#f97316', backgroundColor: 'rgba(249,115,22,.1)', tension: .2, fill: true},
{label: '🟡 Medium', data: data.map(p => p.medium), borderColor: '#eab308', backgroundColor: 'rgba(234,179,8,.1)', tension: .2, fill: true},
{label: '🟢 Sain', data: data.map(p => p.sain), borderColor: '#22c55e', backgroundColor: 'rgba(34,197,94,.1)', tension: .2, fill: false, hidden: true},
{label: '⚫ Non scanné', data: data.map(p => p.non_scanne), borderColor: '#6b7280', backgroundColor: 'rgba(107,114,128,.1)', tension: .2, fill: false, hidden: true}
]
},
options: {
responsive: true,
plugins: {legend: {labels: {color: '#cbd5e1'}}},
scales: {
x: {ticks: {color: '#94a3b8'}, grid: {color: 'rgba(255,255,255,.05)'}},
y: {ticks: {color: '#94a3b8'}, grid: {color: 'rgba(255,255,255,.05)'}, beginAtZero: true}
}
}
});
</script>
{% endif %}
{% endblock %}

14
scripts/snapshot_runner.py Executable file
View File

@ -0,0 +1,14 @@
#!/usr/bin/env python3
"""Lance un snapshot dashboard vuln. Appele par systemd timer (1x/jour)."""
import sys, os
sys.path.insert(0, "/opt/patchcenter")
from app.database import SessionLocal
from app.services.qualys_service import compute_vuln_dashboard
s = SessionLocal()
try:
res = compute_vuln_dashboard(s, triggered_by="cron")
print(f"[snapshot] ok={res['ok']} run_id={res['run_id']} assets={res['asset_count']} dur={res['duration_sec']}s msg={res['msg']}")
sys.exit(0 if res["ok"] else 1)
finally:
s.close()