feat(qualys/dashboard): compute v2 - interroge API Qualys par tag (DB locale qualys_asset_tags souvent obsolete)

This commit is contained in:
Pierre & Lumière 2026-04-25 00:42:29 +00:00
parent 54c10d90de
commit 8f406f211d

View File

@ -889,10 +889,44 @@ def _is_scanned(asset_row, has_vuln_data):
return True return True
return False return False
def _fetch_asset_ids_by_tag(db, tag_name):
"""Appelle Qualys API pour recuperer la liste d'asset_ids ayant ce tag."""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return set()
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/search/am/hostasset",
json={"ServiceRequest": {
"preferences": {"limitResults": 1000},
"filters": {"Criteria": [
{"field": "tagName", "operator": "EQUALS", "value": tag_name}
]}
}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=120, proxies=proxies,
headers={"Content-Type": "application/json"}
)
if r.status_code != 200 or "SUCCESS" not in r.text:
return set()
ids = set()
for block in r.text.split("<HostAsset>")[1:]:
block = block.split("</HostAsset>")[0]
aid = (parse_xml(block, "id") or [""])[0]
if aid and aid.isdigit():
ids.add(int(aid))
return ids
except Exception:
return set()
def compute_vuln_dashboard(db, triggered_by="manual", run_id=None): def compute_vuln_dashboard(db, triggered_by="manual", run_id=None):
"""Calcule un nouveau snapshot. Si run_id fourni, l'utilise (sinon en cree un). """Calcule un nouveau snapshot. Interroge l'API Qualys par tag pour avoir
Retourne dict {ok, msg, run_id, asset_count, duration_sec}.""" les vraies associations asset<->tag (la DB locale qualys_asset_tags peut etre obsolete).
Si run_id fourni, l'utilise (sinon en cree un)."""
import time import time
from concurrent.futures import ThreadPoolExecutor
t0 = time.time() t0 = time.time()
try: try:
# 1. Creer le run en pending si pas deja fourni # 1. Creer le run en pending si pas deja fourni
@ -903,22 +937,29 @@ def compute_vuln_dashboard(db, triggered_by="manual", run_id=None):
"""), {"tb": triggered_by}).scalar() """), {"tb": triggered_by}).scalar()
db.commit() db.commit()
# 2. Charger tous les assets avec leurs tags + domaine AD # 2. Tags a interroger : ENV-*, POS-*, OS-*
tag_rows = db.execute(text("""SELECT name FROM qualys_tags
WHERE name ~ '^(ENV|POS|OS)-' ORDER BY name""")).fetchall()
tag_names = [r.name for r in tag_rows]
# 3. Pour chaque tag, appel API parallele -> {tag_name: set(asset_id)}
def fetch_one(name):
return name, _fetch_asset_ids_by_tag(db, name)
tag_id_map = {}
with ThreadPoolExecutor(max_workers=8) as ex:
for name, ids in ex.map(fetch_one, tag_names):
tag_id_map[name] = ids
# 4. Charger tous les assets locaux (pour total + IPs + agent_status + domain_ltd)
rows = db.execute(text(""" rows = db.execute(text("""
SELECT qa.qualys_asset_id, qa.hostname, qa.ip_address, qa.agent_status, SELECT qa.qualys_asset_id, qa.hostname, qa.ip_address, qa.agent_status,
qa.last_checkin, qa.os_family, qa.server_id,
COALESCE(string_agg(DISTINCT qt.name, '|' ORDER BY qt.name), '') as tag_names,
s.domain_ltd
FROM qualys_assets qa
LEFT JOIN qualys_asset_tags qat ON qat.qualys_asset_id = qa.qualys_asset_id
LEFT JOIN qualys_tags qt ON qt.qualys_tag_id = qat.qualys_tag_id
LEFT JOIN servers s ON s.id = qa.server_id
GROUP BY qa.qualys_asset_id, qa.hostname, qa.ip_address, qa.agent_status,
qa.last_checkin, qa.os_family, qa.server_id, s.domain_ltd qa.last_checkin, qa.os_family, qa.server_id, s.domain_ltd
FROM qualys_assets qa
LEFT JOIN servers s ON s.id = qa.server_id
""")).fetchall() """)).fetchall()
asset_count = len(rows) asset_count = len(rows)
# 3. Recuperer vulns par batch de 50 IPs (cache 10min via get_vuln_counts) # 5. Recuperer vulns par batch de 50 IPs (cache 10min via get_vuln_counts)
ip_to_vuln = {} ip_to_vuln = {}
unique_ips = list({str(r.ip_address) for r in rows unique_ips = list({str(r.ip_address) for r in rows
if r.ip_address and str(r.ip_address) != "None"}) if r.ip_address and str(r.ip_address) != "None"})
@ -931,7 +972,7 @@ def compute_vuln_dashboard(db, triggered_by="manual", run_id=None):
except Exception: except Exception:
pass pass
# 4. Classifier + agreger # 6. Classifier + agreger en utilisant les associations API
agg = {} agg = {}
def _bump(dim, val, val2, level): def _bump(dim, val, val2, level):
key = (dim, val or "(none)", val2 or "") key = (dim, val or "(none)", val2 or "")
@ -942,30 +983,33 @@ def compute_vuln_dashboard(db, triggered_by="manual", run_id=None):
agg[key][level] += 1 agg[key][level] += 1
for r in rows: for r in rows:
aid = int(r.qualys_asset_id)
ip = str(r.ip_address) if r.ip_address else None ip = str(r.ip_address) if r.ip_address else None
vc = ip_to_vuln.get(ip) if ip else None vc = ip_to_vuln.get(ip) if ip else None
scanned = _is_scanned(r, vc is not None) scanned = _is_scanned(r, vc is not None)
level = "non_scanne" if not scanned else _classify_severity(vc or {}) level = "non_scanne" if not scanned else _classify_severity(vc or {})
tags = (r.tag_names or "").split("|") if r.tag_names else [] envs = sorted(t for t, ids in tag_id_map.items()
envs = [t for t in tags if t.startswith("ENV-")] if t.startswith("ENV-") and aid in ids)
poses = [t for t in tags if t.startswith("POS-")] poses = sorted(t for t, ids in tag_id_map.items()
oses = [t for t in tags if t.startswith("OS-")] if t.startswith("POS-") and aid in ids)
oses = sorted(t for t, ids in tag_id_map.items()
if t.startswith("OS-") and aid in ids)
dom = r.domain_ltd or "(sans domaine)" dom = r.domain_ltd or "(sans domaine)"
_bump("global", "all", "", level) _bump("global", "all", "", level)
for e in envs or ["(sans env)"]: for e in envs or ["(sans env)"]:
_bump("env", e, "", level) _bump("env", e, "", level)
for p in poses or ["(sans pos)"]: for pos in poses or ["(sans pos)"]:
_bump("pos", p, "", level) _bump("pos", pos, "", level)
for o in oses or ["(sans os)"]: for o in oses or ["(sans os)"]:
_bump("os", o, "", level) _bump("os", o, "", level)
_bump("domain", dom, "", level) _bump("domain", dom, "", level)
for e in envs or ["(sans env)"]: for e in envs or ["(sans env)"]:
for p in poses or ["(sans pos)"]: for pos in poses or ["(sans pos)"]:
_bump("env_pos", e, p, level) _bump("env_pos", e, pos, level)
# 5. Persister les agregats # 7. Persister
for (dim, val, val2), counts in agg.items(): for (dim, val, val2), counts in agg.items():
db.execute(text(""" db.execute(text("""
INSERT INTO qualys_vuln_snapshot INSERT INTO qualys_vuln_snapshot
@ -979,8 +1023,10 @@ def compute_vuln_dashboard(db, triggered_by="manual", run_id=None):
duration = int(time.time() - t0) duration = int(time.time() - t0)
db.execute(text("""UPDATE qualys_vuln_snapshot_run db.execute(text("""UPDATE qualys_vuln_snapshot_run
SET status='ok', asset_count=:ac, duration_sec=:d, msg='OK' WHERE id=:rid"""), SET status='ok', asset_count=:ac, duration_sec=:d,
{"ac": asset_count, "d": duration, "rid": run_id}) msg=:m WHERE id=:rid"""),
{"ac": asset_count, "d": duration,
"m": f"OK ({len(tag_names)} tags interroges)", "rid": run_id})
db.commit() db.commit()
return {"ok": True, "msg": "OK", "run_id": run_id, return {"ok": True, "msg": "OK", "run_id": run_id,
"asset_count": asset_count, "duration_sec": duration} "asset_count": asset_count, "duration_sec": duration}