From 8f406f211d9591ae4cfd48d8967afa82272611ff Mon Sep 17 00:00:00 2001 From: Admin MPCZ Date: Sat, 25 Apr 2026 00:42:29 +0000 Subject: [PATCH] feat(qualys/dashboard): compute v2 - interroge API Qualys par tag (DB locale qualys_asset_tags souvent obsolete) --- app/services/qualys_service.py | 92 +++++++++++++++++++++++++--------- 1 file changed, 69 insertions(+), 23 deletions(-) diff --git a/app/services/qualys_service.py b/app/services/qualys_service.py index e5ffdc8..b476031 100644 --- a/app/services/qualys_service.py +++ b/app/services/qualys_service.py @@ -889,10 +889,44 @@ def _is_scanned(asset_row, has_vuln_data): return True return False +def _fetch_asset_ids_by_tag(db, tag_name): + """Appelle Qualys API pour recuperer la liste d'asset_ids ayant ce tag.""" + qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) + if not qualys_user: + return set() + proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None + try: + r = requests.post( + f"{qualys_url}/qps/rest/2.0/search/am/hostasset", + json={"ServiceRequest": { + "preferences": {"limitResults": 1000}, + "filters": {"Criteria": [ + {"field": "tagName", "operator": "EQUALS", "value": tag_name} + ]} + }}, + auth=(qualys_user, qualys_pass), + verify=False, timeout=120, proxies=proxies, + headers={"Content-Type": "application/json"} + ) + if r.status_code != 200 or "SUCCESS" not in r.text: + return set() + ids = set() + for block in r.text.split("")[1:]: + block = block.split("")[0] + aid = (parse_xml(block, "id") or [""])[0] + if aid and aid.isdigit(): + ids.add(int(aid)) + return ids + except Exception: + return set() + + def compute_vuln_dashboard(db, triggered_by="manual", run_id=None): - """Calcule un nouveau snapshot. Si run_id fourni, l'utilise (sinon en cree un). - Retourne dict {ok, msg, run_id, asset_count, duration_sec}.""" + """Calcule un nouveau snapshot. Interroge l'API Qualys par tag pour avoir + les vraies associations asset<->tag (la DB locale qualys_asset_tags peut etre obsolete). + Si run_id fourni, l'utilise (sinon en cree un).""" import time + from concurrent.futures import ThreadPoolExecutor t0 = time.time() try: # 1. Creer le run en pending si pas deja fourni @@ -903,22 +937,29 @@ def compute_vuln_dashboard(db, triggered_by="manual", run_id=None): """), {"tb": triggered_by}).scalar() db.commit() - # 2. Charger tous les assets avec leurs tags + domaine AD + # 2. Tags a interroger : ENV-*, POS-*, OS-* + tag_rows = db.execute(text("""SELECT name FROM qualys_tags + WHERE name ~ '^(ENV|POS|OS)-' ORDER BY name""")).fetchall() + tag_names = [r.name for r in tag_rows] + + # 3. Pour chaque tag, appel API parallele -> {tag_name: set(asset_id)} + def fetch_one(name): + return name, _fetch_asset_ids_by_tag(db, name) + tag_id_map = {} + with ThreadPoolExecutor(max_workers=8) as ex: + for name, ids in ex.map(fetch_one, tag_names): + tag_id_map[name] = ids + + # 4. Charger tous les assets locaux (pour total + IPs + agent_status + domain_ltd) rows = db.execute(text(""" SELECT qa.qualys_asset_id, qa.hostname, qa.ip_address, qa.agent_status, - qa.last_checkin, qa.os_family, qa.server_id, - COALESCE(string_agg(DISTINCT qt.name, '|' ORDER BY qt.name), '') as tag_names, - s.domain_ltd + qa.last_checkin, qa.os_family, qa.server_id, s.domain_ltd FROM qualys_assets qa - LEFT JOIN qualys_asset_tags qat ON qat.qualys_asset_id = qa.qualys_asset_id - LEFT JOIN qualys_tags qt ON qt.qualys_tag_id = qat.qualys_tag_id LEFT JOIN servers s ON s.id = qa.server_id - GROUP BY qa.qualys_asset_id, qa.hostname, qa.ip_address, qa.agent_status, - qa.last_checkin, qa.os_family, qa.server_id, s.domain_ltd """)).fetchall() asset_count = len(rows) - # 3. Recuperer vulns par batch de 50 IPs (cache 10min via get_vuln_counts) + # 5. Recuperer vulns par batch de 50 IPs (cache 10min via get_vuln_counts) ip_to_vuln = {} unique_ips = list({str(r.ip_address) for r in rows if r.ip_address and str(r.ip_address) != "None"}) @@ -931,7 +972,7 @@ def compute_vuln_dashboard(db, triggered_by="manual", run_id=None): except Exception: pass - # 4. Classifier + agreger + # 6. Classifier + agreger en utilisant les associations API agg = {} def _bump(dim, val, val2, level): key = (dim, val or "(none)", val2 or "") @@ -942,30 +983,33 @@ def compute_vuln_dashboard(db, triggered_by="manual", run_id=None): agg[key][level] += 1 for r in rows: + aid = int(r.qualys_asset_id) ip = str(r.ip_address) if r.ip_address else None vc = ip_to_vuln.get(ip) if ip else None scanned = _is_scanned(r, vc is not None) level = "non_scanne" if not scanned else _classify_severity(vc or {}) - tags = (r.tag_names or "").split("|") if r.tag_names else [] - envs = [t for t in tags if t.startswith("ENV-")] - poses = [t for t in tags if t.startswith("POS-")] - oses = [t for t in tags if t.startswith("OS-")] + envs = sorted(t for t, ids in tag_id_map.items() + if t.startswith("ENV-") and aid in ids) + poses = sorted(t for t, ids in tag_id_map.items() + if t.startswith("POS-") and aid in ids) + oses = sorted(t for t, ids in tag_id_map.items() + if t.startswith("OS-") and aid in ids) dom = r.domain_ltd or "(sans domaine)" _bump("global", "all", "", level) for e in envs or ["(sans env)"]: _bump("env", e, "", level) - for p in poses or ["(sans pos)"]: - _bump("pos", p, "", level) + for pos in poses or ["(sans pos)"]: + _bump("pos", pos, "", level) for o in oses or ["(sans os)"]: _bump("os", o, "", level) _bump("domain", dom, "", level) for e in envs or ["(sans env)"]: - for p in poses or ["(sans pos)"]: - _bump("env_pos", e, p, level) + for pos in poses or ["(sans pos)"]: + _bump("env_pos", e, pos, level) - # 5. Persister les agregats + # 7. Persister for (dim, val, val2), counts in agg.items(): db.execute(text(""" INSERT INTO qualys_vuln_snapshot @@ -979,8 +1023,10 @@ def compute_vuln_dashboard(db, triggered_by="manual", run_id=None): duration = int(time.time() - t0) db.execute(text("""UPDATE qualys_vuln_snapshot_run - SET status='ok', asset_count=:ac, duration_sec=:d, msg='OK' WHERE id=:rid"""), - {"ac": asset_count, "d": duration, "rid": run_id}) + SET status='ok', asset_count=:ac, duration_sec=:d, + msg=:m WHERE id=:rid"""), + {"ac": asset_count, "d": duration, + "m": f"OK ({len(tag_names)} tags interroges)", "rid": run_id}) db.commit() return {"ok": True, "msg": "OK", "run_id": run_id, "asset_count": asset_count, "duration_sec": duration}