From 0d4ce6dfc293093f1c29350c1b62afdb45361bda Mon Sep 17 00:00:00 2001 From: Admin MPCZ Date: Sat, 25 Apr 2026 19:56:23 +0000 Subject: [PATCH] feat(qualys/duplicates): scan filtre Linux+Windows Server uniquement (~1200 vs 6244) --- app/services/qualys_service.py | 104 +++++++++++++++++++-------------- 1 file changed, 60 insertions(+), 44 deletions(-) diff --git a/app/services/qualys_service.py b/app/services/qualys_service.py index 3447046..5bb626e 100644 --- a/app/services/qualys_service.py +++ b/app/services/qualys_service.py @@ -1111,54 +1111,70 @@ def load_vuln_history(db, period="day", days=30, dimension="global", dimension_v # =========================================================================== def fetch_all_qualys_assets(db, with_progress=False): - """Recupere TOUS les assets Qualys via API avec pagination (lastId). - Retourne liste de dicts {id, name, ip, last_check, agent_status}.""" + """Recupere les assets Qualys SERVEURS via API (tags built-in Linux Server + Windows Server). + Pagination via id GREATER, batch 1000. Filtre directement cote API pour eviter de scanner + les ~5000 postes de travail (gain de temps x4). + Retourne liste de dicts {id, name, ip, os, last_check, agent_status}.""" qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return [] proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None - all_assets = [] - last_id = 0 - while True: - body = {"ServiceRequest": {"preferences": {"limitResults": 1000}}} - if last_id: - body["ServiceRequest"]["filters"] = {"Criteria": [ - {"field": "id", "operator": "GREATER", "value": str(last_id)} - ]} - try: - r = requests.post( - f"{qualys_url}/qps/rest/2.0/search/am/hostasset", - json=body, auth=(qualys_user, qualys_pass), - verify=False, timeout=180, proxies=proxies, - headers={"Content-Type": "application/json"} - ) - except Exception: - break - if r.status_code != 200 or "SUCCESS" not in r.text: - break - batch = [] - for block in r.text.split("")[1:]: - block = block.split("")[0] - aid = (parse_xml(block, "id") or [""])[0] - name = (parse_xml(block, "name") or [""])[0] - addr = (parse_xml(block, "address") or [""])[0] - os_str = (parse_xml(block, "os") or [""])[0] - last_check = "" - if "" in block: - last_check = (parse_xml(block, "lastCheckedIn") or [""])[0] - agent_status = "" - if "" in block: - agent_status = (parse_xml(block, "status") or [""])[0] - if aid and aid.isdigit(): - batch.append({"id": int(aid), "name": name, "ip": addr, "os": os_str, - "last_check": last_check, "agent_status": agent_status}) - if not batch: - break - all_assets.extend(batch) - last_id = max(b["id"] for b in batch) - if len(batch) < 1000: - break - return all_assets + + def fetch_by_tag(tag_name): + results = [] + last_id = 0 + while True: + criteria = [{"field": "tagName", "operator": "EQUALS", "value": tag_name}] + if last_id: + criteria.append({"field": "id", "operator": "GREATER", "value": str(last_id)}) + body = {"ServiceRequest": {"preferences": {"limitResults": 1000}, + "filters": {"Criteria": criteria}}} + try: + r = requests.post( + f"{qualys_url}/qps/rest/2.0/search/am/hostasset", + json=body, auth=(qualys_user, qualys_pass), + verify=False, timeout=180, proxies=proxies, + headers={"Content-Type": "application/json"} + ) + except Exception: + break + if r.status_code != 200 or "SUCCESS" not in r.text: + break + batch = [] + for block in r.text.split("")[1:]: + block = block.split("")[0] + aid = (parse_xml(block, "id") or [""])[0] + name = (parse_xml(block, "name") or [""])[0] + addr = (parse_xml(block, "address") or [""])[0] + os_str = (parse_xml(block, "os") or [""])[0] + last_check = "" + if "" in block: + last_check = (parse_xml(block, "lastCheckedIn") or [""])[0] + agent_status = "" + if "" in block: + agent_status = (parse_xml(block, "status") or [""])[0] + if aid and aid.isdigit(): + batch.append({"id": int(aid), "name": name, "ip": addr, "os": os_str, + "last_check": last_check, "agent_status": agent_status}) + if not batch: + break + results.extend(batch) + last_id = max(b["id"] for b in batch) + if len(batch) < 1000: + break + return results + + # Linux Server + Windows Server (tags built-in Qualys, couvre tous les serveurs) + linux = fetch_by_tag("Linux Server") + windows = fetch_by_tag("Windows Server") + # Dedupe par id (un asset pourrait theoriquement avoir les 2 tags) + seen = set() + out = [] + for a in linux + windows: + if a["id"] not in seen: + seen.add(a["id"]) + out.append(a) + return out def find_duplicate_hostnames(db, force_refresh=False):