feat(qualys/duplicates): scan filtre Linux+Windows Server uniquement (~1200 vs 6244)

This commit is contained in:
Pierre & Lumière 2026-04-25 19:56:23 +00:00
parent 2c5c0df355
commit 0d4ce6dfc2

View File

@ -1111,54 +1111,70 @@ def load_vuln_history(db, period="day", days=30, dimension="global", dimension_v
# =========================================================================== # ===========================================================================
def fetch_all_qualys_assets(db, with_progress=False): def fetch_all_qualys_assets(db, with_progress=False):
"""Recupere TOUS les assets Qualys via API avec pagination (lastId). """Recupere les assets Qualys SERVEURS via API (tags built-in Linux Server + Windows Server).
Retourne liste de dicts {id, name, ip, last_check, agent_status}.""" Pagination via id GREATER, batch 1000. Filtre directement cote API pour eviter de scanner
les ~5000 postes de travail (gain de temps x4).
Retourne liste de dicts {id, name, ip, os, last_check, agent_status}."""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user: if not qualys_user:
return [] return []
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
all_assets = []
last_id = 0 def fetch_by_tag(tag_name):
while True: results = []
body = {"ServiceRequest": {"preferences": {"limitResults": 1000}}} last_id = 0
if last_id: while True:
body["ServiceRequest"]["filters"] = {"Criteria": [ criteria = [{"field": "tagName", "operator": "EQUALS", "value": tag_name}]
{"field": "id", "operator": "GREATER", "value": str(last_id)} if last_id:
]} criteria.append({"field": "id", "operator": "GREATER", "value": str(last_id)})
try: body = {"ServiceRequest": {"preferences": {"limitResults": 1000},
r = requests.post( "filters": {"Criteria": criteria}}}
f"{qualys_url}/qps/rest/2.0/search/am/hostasset", try:
json=body, auth=(qualys_user, qualys_pass), r = requests.post(
verify=False, timeout=180, proxies=proxies, f"{qualys_url}/qps/rest/2.0/search/am/hostasset",
headers={"Content-Type": "application/json"} json=body, auth=(qualys_user, qualys_pass),
) verify=False, timeout=180, proxies=proxies,
except Exception: headers={"Content-Type": "application/json"}
break )
if r.status_code != 200 or "SUCCESS" not in r.text: except Exception:
break break
batch = [] if r.status_code != 200 or "SUCCESS" not in r.text:
for block in r.text.split("<HostAsset>")[1:]: break
block = block.split("</HostAsset>")[0] batch = []
aid = (parse_xml(block, "id") or [""])[0] for block in r.text.split("<HostAsset>")[1:]:
name = (parse_xml(block, "name") or [""])[0] block = block.split("</HostAsset>")[0]
addr = (parse_xml(block, "address") or [""])[0] aid = (parse_xml(block, "id") or [""])[0]
os_str = (parse_xml(block, "os") or [""])[0] name = (parse_xml(block, "name") or [""])[0]
last_check = "" addr = (parse_xml(block, "address") or [""])[0]
if "<lastCheckedIn>" in block: os_str = (parse_xml(block, "os") or [""])[0]
last_check = (parse_xml(block, "lastCheckedIn") or [""])[0] last_check = ""
agent_status = "" if "<lastCheckedIn>" in block:
if "<agentInfo>" in block: last_check = (parse_xml(block, "lastCheckedIn") or [""])[0]
agent_status = (parse_xml(block, "status") or [""])[0] agent_status = ""
if aid and aid.isdigit(): if "<agentInfo>" in block:
batch.append({"id": int(aid), "name": name, "ip": addr, "os": os_str, agent_status = (parse_xml(block, "status") or [""])[0]
"last_check": last_check, "agent_status": agent_status}) if aid and aid.isdigit():
if not batch: batch.append({"id": int(aid), "name": name, "ip": addr, "os": os_str,
break "last_check": last_check, "agent_status": agent_status})
all_assets.extend(batch) if not batch:
last_id = max(b["id"] for b in batch) break
if len(batch) < 1000: results.extend(batch)
break last_id = max(b["id"] for b in batch)
return all_assets if len(batch) < 1000:
break
return results
# Linux Server + Windows Server (tags built-in Qualys, couvre tous les serveurs)
linux = fetch_by_tag("Linux Server")
windows = fetch_by_tag("Windows Server")
# Dedupe par id (un asset pourrait theoriquement avoir les 2 tags)
seen = set()
out = []
for a in linux + windows:
if a["id"] not in seen:
seen.add(a["id"])
out.append(a)
return out
def find_duplicate_hostnames(db, force_refresh=False): def find_duplicate_hostnames(db, force_refresh=False):