diff --git a/app/services/qualys_service.py b/app/services/qualys_service.py
index 3447046..5bb626e 100644
--- a/app/services/qualys_service.py
+++ b/app/services/qualys_service.py
@@ -1111,54 +1111,70 @@ def load_vuln_history(db, period="day", days=30, dimension="global", dimension_v
# ===========================================================================
def fetch_all_qualys_assets(db, with_progress=False):
- """Recupere TOUS les assets Qualys via API avec pagination (lastId).
- Retourne liste de dicts {id, name, ip, last_check, agent_status}."""
+ """Recupere les assets Qualys SERVEURS via API (tags built-in Linux Server + Windows Server).
+ Pagination via id GREATER, batch 1000. Filtre directement cote API pour eviter de scanner
+ les ~5000 postes de travail (gain de temps x4).
+ Retourne liste de dicts {id, name, ip, os, last_check, agent_status}."""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return []
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
- all_assets = []
- last_id = 0
- while True:
- body = {"ServiceRequest": {"preferences": {"limitResults": 1000}}}
- if last_id:
- body["ServiceRequest"]["filters"] = {"Criteria": [
- {"field": "id", "operator": "GREATER", "value": str(last_id)}
- ]}
- try:
- r = requests.post(
- f"{qualys_url}/qps/rest/2.0/search/am/hostasset",
- json=body, auth=(qualys_user, qualys_pass),
- verify=False, timeout=180, proxies=proxies,
- headers={"Content-Type": "application/json"}
- )
- except Exception:
- break
- if r.status_code != 200 or "SUCCESS" not in r.text:
- break
- batch = []
- for block in r.text.split("")[1:]:
- block = block.split("")[0]
- aid = (parse_xml(block, "id") or [""])[0]
- name = (parse_xml(block, "name") or [""])[0]
- addr = (parse_xml(block, "address") or [""])[0]
- os_str = (parse_xml(block, "os") or [""])[0]
- last_check = ""
- if "" in block:
- last_check = (parse_xml(block, "lastCheckedIn") or [""])[0]
- agent_status = ""
- if "" in block:
- agent_status = (parse_xml(block, "status") or [""])[0]
- if aid and aid.isdigit():
- batch.append({"id": int(aid), "name": name, "ip": addr, "os": os_str,
- "last_check": last_check, "agent_status": agent_status})
- if not batch:
- break
- all_assets.extend(batch)
- last_id = max(b["id"] for b in batch)
- if len(batch) < 1000:
- break
- return all_assets
+
+ def fetch_by_tag(tag_name):
+ results = []
+ last_id = 0
+ while True:
+ criteria = [{"field": "tagName", "operator": "EQUALS", "value": tag_name}]
+ if last_id:
+ criteria.append({"field": "id", "operator": "GREATER", "value": str(last_id)})
+ body = {"ServiceRequest": {"preferences": {"limitResults": 1000},
+ "filters": {"Criteria": criteria}}}
+ try:
+ r = requests.post(
+ f"{qualys_url}/qps/rest/2.0/search/am/hostasset",
+ json=body, auth=(qualys_user, qualys_pass),
+ verify=False, timeout=180, proxies=proxies,
+ headers={"Content-Type": "application/json"}
+ )
+ except Exception:
+ break
+ if r.status_code != 200 or "SUCCESS" not in r.text:
+ break
+ batch = []
+ for block in r.text.split("")[1:]:
+ block = block.split("")[0]
+ aid = (parse_xml(block, "id") or [""])[0]
+ name = (parse_xml(block, "name") or [""])[0]
+ addr = (parse_xml(block, "address") or [""])[0]
+ os_str = (parse_xml(block, "os") or [""])[0]
+ last_check = ""
+ if "" in block:
+ last_check = (parse_xml(block, "lastCheckedIn") or [""])[0]
+ agent_status = ""
+ if "" in block:
+ agent_status = (parse_xml(block, "status") or [""])[0]
+ if aid and aid.isdigit():
+ batch.append({"id": int(aid), "name": name, "ip": addr, "os": os_str,
+ "last_check": last_check, "agent_status": agent_status})
+ if not batch:
+ break
+ results.extend(batch)
+ last_id = max(b["id"] for b in batch)
+ if len(batch) < 1000:
+ break
+ return results
+
+ # Linux Server + Windows Server (tags built-in Qualys, couvre tous les serveurs)
+ linux = fetch_by_tag("Linux Server")
+ windows = fetch_by_tag("Windows Server")
+ # Dedupe par id (un asset pourrait theoriquement avoir les 2 tags)
+ seen = set()
+ out = []
+ for a in linux + windows:
+ if a["id"] not in seen:
+ seen.add(a["id"])
+ out.append(a)
+ return out
def find_duplicate_hostnames(db, force_refresh=False):