From 67f123e9f5bcd3513594a7f2b09cd99e023d3272 Mon Sep 17 00:00:00 2001 From: Admin MPCZ Date: Tue, 14 Apr 2026 15:13:48 +0200 Subject: [PATCH] Qualys refresh: pagination + per-row savepoint to isolate errors --- app/services/qualys_service.py | 171 ++++++++++++++++++++------------- 1 file changed, 104 insertions(+), 67 deletions(-) diff --git a/app/services/qualys_service.py b/app/services/qualys_service.py index ca6ce84..722b928 100644 --- a/app/services/qualys_service.py +++ b/app/services/qualys_service.py @@ -535,88 +535,125 @@ def get_cache_stats(): def refresh_all_agents(db): - """Rafraichit tous les agents depuis l'API Qualys QPS (bulk)""" + """Rafraichit tous les agents depuis l'API Qualys QPS (bulk, paginé)""" qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return {"ok": False, "msg": "Credentials Qualys non configurés"} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None - stats = {"created": 0, "updated": 0, "errors": 0} + stats = {"created": 0, "updated": 0, "errors": 0, "pages": 0} + last_id = None + max_pages = 20 # garde-fou + # Autocommit sur la session courante try: - r = requests.post( - f"{qualys_url}/qps/rest/2.0/search/am/hostasset", - json={"ServiceRequest": {"preferences": {"limitResults": 1000}, "filters": {"Criteria": [{"field": "tagName", "operator": "CONTAINS", "value": "server"}]}}}, - auth=(qualys_user, qualys_pass), - verify=False, timeout=120, proxies=proxies, - headers={"X-Requested-With": "PatchCenter", "Content-Type": "application/json"}) - except Exception as e: - return {"ok": False, "msg": str(e)} + db.commit() # flush any pending + except Exception: + pass - if r.status_code != 200 or "SUCCESS" not in r.text: - return {"ok": False, "msg": f"API HTTP {r.status_code}"} - - for block in r.text.split("")[1:]: - block = block.split("")[0] + while stats["pages"] < max_pages: + stats["pages"] += 1 + criteria = [{"field": "tagName", "operator": "CONTAINS", "value": "server"}] + if last_id: + criteria.append({"field": "id", "operator": "GREATER", "value": str(last_id)}) + payload = {"ServiceRequest": { + "preferences": {"limitResults": 100}, + "filters": {"Criteria": criteria} + }} try: - asset_id = (parse_xml(block, "id") or [""])[0] - name = (parse_xml(block, "name") or [""])[0] - hostname = name.split(".")[0].lower() if name else "" - address = (parse_xml(block, "address") or [""])[0] - fqdn = (parse_xml(block, "fqdn") or [""])[0] - os_val = (parse_xml(block, "os") or [""])[0] + r = requests.post( + f"{qualys_url}/qps/rest/2.0/search/am/hostasset", + json=payload, auth=(qualys_user, qualys_pass), + verify=False, timeout=120, proxies=proxies, + headers={"X-Requested-With": "PatchCenter", "Content-Type": "application/json"}) + except Exception as e: + return {"ok": False, "msg": f"page {stats['pages']}: {e}", **stats} - agent_status = "" - agent_version = "" - last_checkin = None - if "" in block: - ab = block[block.find(""):block.find("")] - agent_status = (parse_xml(ab, "status") or [""])[0] - agent_version = (parse_xml(ab, "agentVersion") or [""])[0] - lc = (parse_xml(ab, "lastCheckedIn") or [""])[0] - last_checkin = lc if lc else None + if r.status_code != 200 or "SUCCESS" not in r.text: + return {"ok": False, "msg": f"HTTP {r.status_code} page {stats['pages']}", **stats} - os_family = None - if any(k in os_val.lower() for k in ("linux", "red hat", "centos", "debian")): - os_family = "linux" - elif "windows" in os_val.lower(): - os_family = "windows" + blocks = r.text.split("")[1:] + if not blocks: + break - # Match server - srv = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname)=LOWER(:h)"), - {"h": hostname}).fetchone() - server_id = srv.id if srv else None - # Sync du FQDN Qualys vers servers.fqdn si présent - if server_id and fqdn: - db.execute(text("UPDATE servers SET fqdn=:fqdn WHERE id=:sid AND (fqdn IS NULL OR fqdn='')"), - {"fqdn": fqdn, "sid": server_id}) + new_last_id = last_id + for block in blocks: + block = block.split("")[0] + try: + asset_id = (parse_xml(block, "id") or [""])[0] + if asset_id and asset_id.isdigit(): + aid_int = int(asset_id) + if new_last_id is None or aid_int > new_last_id: + new_last_id = aid_int + name = (parse_xml(block, "name") or [""])[0] + hostname = name.split(".")[0].lower() if name else "" + address = (parse_xml(block, "address") or [""])[0] + fqdn = (parse_xml(block, "fqdn") or [""])[0] + os_val = (parse_xml(block, "os") or [""])[0] - existing = db.execute(text("SELECT id FROM qualys_assets WHERE qualys_asset_id=:qid"), - {"qid": int(asset_id)}).fetchone() + agent_status = "" + agent_version = "" + last_checkin = None + if "" in block: + ab = block[block.find(""):block.find("")] + agent_status = (parse_xml(ab, "status") or [""])[0] + agent_version = (parse_xml(ab, "agentVersion") or [""])[0] + lc = (parse_xml(ab, "lastCheckedIn") or [""])[0] + last_checkin = lc if lc else None - if existing: - db.execute(text("""UPDATE qualys_assets SET - name=:name, hostname=:hn, fqdn=:fqdn, ip_address=:ip, os=:os, os_family=:osf, - agent_status=:ast, agent_version=:av, last_checkin=:lc, server_id=:sid, updated_at=now() - WHERE qualys_asset_id=:qid"""), - {"qid": int(asset_id), "name": name, "hn": hostname, "fqdn": fqdn or None, - "ip": address or None, "os": os_val, "osf": os_family, - "ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id}) - stats["updated"] += 1 - else: - db.execute(text("""INSERT INTO qualys_assets - (qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family, - agent_status, agent_version, last_checkin, server_id) - VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid)"""), - {"qid": int(asset_id), "name": name, "hn": hostname, "fqdn": fqdn or None, - "ip": address or None, "os": os_val, "osf": os_family, - "ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id}) - stats["created"] += 1 + os_family = None + if any(k in os_val.lower() for k in ("linux", "red hat", "centos", "debian")): + os_family = "linux" + elif "windows" in os_val.lower(): + os_family = "windows" - except Exception: - stats["errors"] += 1 + # Savepoint pour isoler les erreurs + try: + sp = db.begin_nested() + srv = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname)=LOWER(:h)"), + {"h": hostname}).fetchone() + server_id = srv.id if srv else None + if server_id and fqdn: + db.execute(text("UPDATE servers SET fqdn=:fqdn WHERE id=:sid AND (fqdn IS NULL OR fqdn='')"), + {"fqdn": fqdn, "sid": server_id}) + + existing = db.execute(text("SELECT id FROM qualys_assets WHERE qualys_asset_id=:qid"), + {"qid": int(asset_id)}).fetchone() + + if existing: + db.execute(text("""UPDATE qualys_assets SET + name=:name, hostname=:hn, fqdn=:fqdn, ip_address=:ip, os=:os, os_family=:osf, + agent_status=:ast, agent_version=:av, last_checkin=:lc, server_id=:sid, updated_at=now() + WHERE qualys_asset_id=:qid"""), + {"qid": int(asset_id), "name": name, "hn": hostname, "fqdn": fqdn or None, + "ip": address or None, "os": os_val, "osf": os_family, + "ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id}) + stats["updated"] += 1 + else: + db.execute(text("""INSERT INTO qualys_assets + (qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family, + agent_status, agent_version, last_checkin, server_id) + VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid)"""), + {"qid": int(asset_id), "name": name, "hn": hostname, "fqdn": fqdn or None, + "ip": address or None, "os": os_val, "osf": os_family, + "ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id}) + stats["created"] += 1 + sp.commit() + except Exception: + try: sp.rollback() + except Exception: pass + stats["errors"] += 1 + except Exception: + stats["errors"] += 1 + + db.commit() + + if "true" not in r.text: + break + if new_last_id == last_id: + break + last_id = new_last_id - db.commit() stats["ok"] = True - stats["msg"] = f"{stats['created']} créés, {stats['updated']} mis à jour" + stats["msg"] = f"{stats['created']} créés, {stats['updated']} mis à jour ({stats['pages']} pages, {stats['errors']} erreurs)" return stats