Qualys refresh: pagination + per-row savepoint to isolate errors

This commit is contained in:
Pierre & Lumière 2026-04-14 15:13:48 +02:00
parent a422894f83
commit 67f123e9f5

View File

@ -535,88 +535,125 @@ def get_cache_stats():
def refresh_all_agents(db): def refresh_all_agents(db):
"""Rafraichit tous les agents depuis l'API Qualys QPS (bulk)""" """Rafraichit tous les agents depuis l'API Qualys QPS (bulk, paginé)"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user: if not qualys_user:
return {"ok": False, "msg": "Credentials Qualys non configurés"} return {"ok": False, "msg": "Credentials Qualys non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
stats = {"created": 0, "updated": 0, "errors": 0} stats = {"created": 0, "updated": 0, "errors": 0, "pages": 0}
last_id = None
max_pages = 20 # garde-fou
# Autocommit sur la session courante
try: try:
r = requests.post( db.commit() # flush any pending
f"{qualys_url}/qps/rest/2.0/search/am/hostasset", except Exception:
json={"ServiceRequest": {"preferences": {"limitResults": 1000}, "filters": {"Criteria": [{"field": "tagName", "operator": "CONTAINS", "value": "server"}]}}}, pass
auth=(qualys_user, qualys_pass),
verify=False, timeout=120, proxies=proxies,
headers={"X-Requested-With": "PatchCenter", "Content-Type": "application/json"})
except Exception as e:
return {"ok": False, "msg": str(e)}
if r.status_code != 200 or "SUCCESS" not in r.text: while stats["pages"] < max_pages:
return {"ok": False, "msg": f"API HTTP {r.status_code}"} stats["pages"] += 1
criteria = [{"field": "tagName", "operator": "CONTAINS", "value": "server"}]
for block in r.text.split("<HostAsset>")[1:]: if last_id:
block = block.split("</HostAsset>")[0] criteria.append({"field": "id", "operator": "GREATER", "value": str(last_id)})
payload = {"ServiceRequest": {
"preferences": {"limitResults": 100},
"filters": {"Criteria": criteria}
}}
try: try:
asset_id = (parse_xml(block, "id") or [""])[0] r = requests.post(
name = (parse_xml(block, "name") or [""])[0] f"{qualys_url}/qps/rest/2.0/search/am/hostasset",
hostname = name.split(".")[0].lower() if name else "" json=payload, auth=(qualys_user, qualys_pass),
address = (parse_xml(block, "address") or [""])[0] verify=False, timeout=120, proxies=proxies,
fqdn = (parse_xml(block, "fqdn") or [""])[0] headers={"X-Requested-With": "PatchCenter", "Content-Type": "application/json"})
os_val = (parse_xml(block, "os") or [""])[0] except Exception as e:
return {"ok": False, "msg": f"page {stats['pages']}: {e}", **stats}
agent_status = "" if r.status_code != 200 or "SUCCESS" not in r.text:
agent_version = "" return {"ok": False, "msg": f"HTTP {r.status_code} page {stats['pages']}", **stats}
last_checkin = None
if "<agentInfo>" in block:
ab = block[block.find("<agentInfo>"):block.find("</agentInfo>")]
agent_status = (parse_xml(ab, "status") or [""])[0]
agent_version = (parse_xml(ab, "agentVersion") or [""])[0]
lc = (parse_xml(ab, "lastCheckedIn") or [""])[0]
last_checkin = lc if lc else None
os_family = None blocks = r.text.split("<HostAsset>")[1:]
if any(k in os_val.lower() for k in ("linux", "red hat", "centos", "debian")): if not blocks:
os_family = "linux" break
elif "windows" in os_val.lower():
os_family = "windows"
# Match server new_last_id = last_id
srv = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname)=LOWER(:h)"), for block in blocks:
{"h": hostname}).fetchone() block = block.split("</HostAsset>")[0]
server_id = srv.id if srv else None try:
# Sync du FQDN Qualys vers servers.fqdn si présent asset_id = (parse_xml(block, "id") or [""])[0]
if server_id and fqdn: if asset_id and asset_id.isdigit():
db.execute(text("UPDATE servers SET fqdn=:fqdn WHERE id=:sid AND (fqdn IS NULL OR fqdn='')"), aid_int = int(asset_id)
{"fqdn": fqdn, "sid": server_id}) if new_last_id is None or aid_int > new_last_id:
new_last_id = aid_int
name = (parse_xml(block, "name") or [""])[0]
hostname = name.split(".")[0].lower() if name else ""
address = (parse_xml(block, "address") or [""])[0]
fqdn = (parse_xml(block, "fqdn") or [""])[0]
os_val = (parse_xml(block, "os") or [""])[0]
existing = db.execute(text("SELECT id FROM qualys_assets WHERE qualys_asset_id=:qid"), agent_status = ""
{"qid": int(asset_id)}).fetchone() agent_version = ""
last_checkin = None
if "<agentInfo>" in block:
ab = block[block.find("<agentInfo>"):block.find("</agentInfo>")]
agent_status = (parse_xml(ab, "status") or [""])[0]
agent_version = (parse_xml(ab, "agentVersion") or [""])[0]
lc = (parse_xml(ab, "lastCheckedIn") or [""])[0]
last_checkin = lc if lc else None
if existing: os_family = None
db.execute(text("""UPDATE qualys_assets SET if any(k in os_val.lower() for k in ("linux", "red hat", "centos", "debian")):
name=:name, hostname=:hn, fqdn=:fqdn, ip_address=:ip, os=:os, os_family=:osf, os_family = "linux"
agent_status=:ast, agent_version=:av, last_checkin=:lc, server_id=:sid, updated_at=now() elif "windows" in os_val.lower():
WHERE qualys_asset_id=:qid"""), os_family = "windows"
{"qid": int(asset_id), "name": name, "hn": hostname, "fqdn": fqdn or None,
"ip": address or None, "os": os_val, "osf": os_family,
"ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id})
stats["updated"] += 1
else:
db.execute(text("""INSERT INTO qualys_assets
(qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family,
agent_status, agent_version, last_checkin, server_id)
VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid)"""),
{"qid": int(asset_id), "name": name, "hn": hostname, "fqdn": fqdn or None,
"ip": address or None, "os": os_val, "osf": os_family,
"ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id})
stats["created"] += 1
except Exception: # Savepoint pour isoler les erreurs
stats["errors"] += 1 try:
sp = db.begin_nested()
srv = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname)=LOWER(:h)"),
{"h": hostname}).fetchone()
server_id = srv.id if srv else None
if server_id and fqdn:
db.execute(text("UPDATE servers SET fqdn=:fqdn WHERE id=:sid AND (fqdn IS NULL OR fqdn='')"),
{"fqdn": fqdn, "sid": server_id})
existing = db.execute(text("SELECT id FROM qualys_assets WHERE qualys_asset_id=:qid"),
{"qid": int(asset_id)}).fetchone()
if existing:
db.execute(text("""UPDATE qualys_assets SET
name=:name, hostname=:hn, fqdn=:fqdn, ip_address=:ip, os=:os, os_family=:osf,
agent_status=:ast, agent_version=:av, last_checkin=:lc, server_id=:sid, updated_at=now()
WHERE qualys_asset_id=:qid"""),
{"qid": int(asset_id), "name": name, "hn": hostname, "fqdn": fqdn or None,
"ip": address or None, "os": os_val, "osf": os_family,
"ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id})
stats["updated"] += 1
else:
db.execute(text("""INSERT INTO qualys_assets
(qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family,
agent_status, agent_version, last_checkin, server_id)
VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid)"""),
{"qid": int(asset_id), "name": name, "hn": hostname, "fqdn": fqdn or None,
"ip": address or None, "os": os_val, "osf": os_family,
"ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id})
stats["created"] += 1
sp.commit()
except Exception:
try: sp.rollback()
except Exception: pass
stats["errors"] += 1
except Exception:
stats["errors"] += 1
db.commit()
if "<hasMoreRecords>true</hasMoreRecords>" not in r.text:
break
if new_last_id == last_id:
break
last_id = new_last_id
db.commit()
stats["ok"] = True stats["ok"] = True
stats["msg"] = f"{stats['created']} créés, {stats['updated']} mis à jour" stats["msg"] = f"{stats['created']} créés, {stats['updated']} mis à jour ({stats['pages']} pages, {stats['errors']} erreurs)"
return stats return stats