diff --git a/app/services/qualys_service.py b/app/services/qualys_service.py
index f73092f..c5319ba 100644
--- a/app/services/qualys_service.py
+++ b/app/services/qualys_service.py
@@ -565,15 +565,25 @@ def refresh_all_agents(db, mode="diff"):
def _refresh_all_agents_impl(db, mode="diff"):
- """Implémentation réelle du refresh (appelée sous verrou)"""
+ """Implémentation réelle du refresh (appelée sous verrou).
+
+ Optimisations 2026-04-16 (audit perf /qualys/agents):
+ - Pré-chargement servers en dict Python (O(1) au lieu de 2 queries SQL par asset)
+ - UPSERT INSERT ... ON CONFLICT DO UPDATE (une seule query au lieu de SELECT+INSERT/UPDATE)
+ - HTTP Session réutilisée (keep-alive, évite le handshake SSL à chaque page)
+ - 5 filtres tagName exécutés en parallèle via ThreadPoolExecutor
+ - Dédup par asset_id (un asset couvert par plusieurs tags n'est traité qu'une fois)
+ - Commit unique en fin de traitement (au lieu de savepoint + commit par asset)
+ - max_pages par filtre étendu (30 → 500) pour éviter syncs incomplets silencieux
+ """
from .secrets_service import get_secret, set_secret
from datetime import datetime, timezone
+ from concurrent.futures import ThreadPoolExecutor, as_completed
# En mode diff : recupere le timestamp du dernier diff sync
last_diff_iso = None
if mode == "diff":
last_diff_iso = get_secret(db, "qualys_last_diff_sync")
- # Early exit seulement en diff : si tous recents ET dernier diff < 30 min
total = db.execute(text("SELECT COUNT(*) FROM qualys_assets")).scalar() or 0
if total > 0 and last_diff_iso:
try:
@@ -585,8 +595,8 @@ def _refresh_all_agents_impl(db, mode="diff"):
except Exception:
pass
- # Sauve immediatement le timestamp de DEBUT (couvre annulation en cours)
- # Format Qualys QPS: YYYY-MM-DDTHH:MM:SSZ (sans microsec)
+ # Sauve immediatement le timestamp de DEBUT (couvre annulation + evite de rater
+ # les updates qui se produisent pendant la sync elle-meme)
if mode == "diff":
try:
start_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
@@ -600,182 +610,233 @@ def _refresh_all_agents_impl(db, mode="diff"):
return {"ok": False, "msg": "Credentials Qualys non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
- stats = {"created": 0, "updated": 0, "errors": 0, "pages": 0}
- last_id = None
- max_pages = 30 # garde-fou par filtre (multi-pass = max_pages * nb_filtres)
+ stats = {"created": 0, "updated": 0, "errors": 0, "pages": 0, "skipped": 0}
- # Autocommit sur la session courante
+ # flush eventuel pending
try:
- db.commit() # flush any pending
+ db.commit()
except Exception:
pass
- # Multi-pass : plusieurs filtres tagName CONTAINS pour couvrir la
- # nomenclature heterogene (SRV, server, SED, SEI, EMV...).
- # Doublons gere par ON CONFLICT/UPDATE sur qualys_asset_id.
- tag_filters = ["SRV", "server", "SED", "SEI", "EMV"]
- current_filter_idx = 0
- last_id = None
+ # --- OPTIM 1 : pré-chargement des servers en RAM -----------------------
+ # Remplace 2 queries SQL par asset par un lookup dict O(1).
+ servers_by_hostname = {}
+ servers_by_ip = {}
+ servers_need_fqdn = set() # ids des servers ayant fqdn NULL ou '' (backfill cible)
+ try:
+ for row in db.execute(text(
+ "SELECT id, LOWER(hostname) AS hn, (fqdn IS NULL OR fqdn='') AS needs_fqdn FROM servers"
+ )).fetchall():
+ if row.hn:
+ servers_by_hostname[row.hn] = row.id
+ if row.needs_fqdn:
+ servers_need_fqdn.add(row.id)
+ for row in db.execute(text(
+ "SELECT s.id, si.ip_address::text AS ip FROM servers s JOIN server_ips si ON si.server_id=s.id"
+ )).fetchall():
+ if row.ip:
+ servers_by_ip[row.ip] = row.id
+ except Exception:
+ pass
- while current_filter_idx < len(tag_filters):
- tag_filter = tag_filters[current_filter_idx]
- if _refresh_cancel.is_set():
- stats["ok"] = True
- stats["cancelled"] = True
- stats["msg"] = f"Annulé apres {stats['pages']} pages (filtre={tag_filter}): {stats['created']} créés, {stats['updated']} mis à jour"
- return stats
- stats["pages"] += 1
- criteria = [{"field": "tagName", "operator": "CONTAINS", "value": tag_filter}]
- if last_id:
- criteria.append({"field": "id", "operator": "GREATER", "value": str(last_id)})
- # Mode diff : filtre 'updated' > dernier sync (champ searchable Qualys QPS)
- if mode == "diff" and last_diff_iso:
- criteria.append({"field": "updated", "operator": "GREATER", "value": last_diff_iso})
- payload = {"ServiceRequest": {
- "preferences": {"limitResults": 100},
- "filters": {"Criteria": criteria}
- }}
- # Retry sur erreurs transitoires proxy/reseau (max 3 essais)
- r = None
- last_err = None
- for attempt in range(3):
- try:
- r = requests.post(
- f"{qualys_url}/qps/rest/2.0/search/am/hostasset",
- json=payload, auth=(qualys_user, qualys_pass),
- verify=False, timeout=600, proxies=proxies,
- headers={"X-Requested-With": "PatchCenter", "Content-Type": "application/json"})
- break
- except Exception as e:
- last_err = e
- import time as _t
- _t.sleep(2 * (attempt + 1)) # backoff 2s, 4s
- if r is None:
- return {"ok": False, "msg": f"page {stats['pages']} apres 3 essais: {last_err}", **stats}
+ # --- OPTIM 2 : HTTP Session réutilisée ---------------------------------
+ # Évite d'ouvrir une nouvelle connexion SSL + auth à chaque page.
+ sess = requests.Session()
+ sess.auth = (qualys_user, qualys_pass)
+ sess.headers.update({"X-Requested-With": "PatchCenter", "Content-Type": "application/json"})
+ sess.verify = False
+ if proxies:
+ sess.proxies.update(proxies)
- if r.status_code != 200 or "SUCCESS" not in r.text:
- return {"ok": False, "msg": f"HTTP {r.status_code} page {stats['pages']}", **stats}
+ MAX_PAGES_PER_FILTER = 500 # 500 x 100 = 50 000 assets/filtre (garde-fou large)
- blocks = r.text.split("")[1:]
- if not blocks:
- break
-
- new_last_id = last_id
- for block in blocks:
- block = block.split("")[0]
- try:
- asset_id = (parse_xml(block, "id") or [""])[0]
- if asset_id and asset_id.isdigit():
- aid_int = int(asset_id)
- if new_last_id is None or aid_int > new_last_id:
- new_last_id = aid_int
- name = (parse_xml(block, "name") or [""])[0]
- address = (parse_xml(block, "address") or [""])[0]
- fqdn = (parse_xml(block, "fqdn") or [""])[0]
- netbios = (parse_xml(block, "netbiosName") or [""])[0]
- os_val = (parse_xml(block, "os") or [""])[0]
- # Priorite name (Qualys display) sauf si c'est une IP ou vide -> FQDN -> NetBIOS
- import re as _re
- def _valid(h):
- if not h:
- return False
- h = h.strip().lower()
- if h in ("localhost", ""):
- return False
- if _re.match(r"^\d+\.\d+\.\d+\.\d+$", h):
- return False
- return True
- hostname_src = name if _valid(name.split(".")[0]) else (fqdn if _valid(fqdn.split(".")[0]) else netbios)
- hostname = hostname_src.split(".")[0].lower() if hostname_src else ""
-
- agent_status = ""
- agent_version = ""
- last_checkin = None
- if "" in block:
- ab = block[block.find(""):block.find("")]
- agent_status = (parse_xml(ab, "status") or [""])[0]
- agent_version = (parse_xml(ab, "agentVersion") or [""])[0]
- lc = (parse_xml(ab, "lastCheckedIn") or [""])[0]
- last_checkin = lc if lc else None
-
- os_family = None
- if any(k in os_val.lower() for k in ("linux", "red hat", "centos", "debian")):
- os_family = "linux"
- elif "windows" in os_val.lower():
- os_family = "windows"
-
- # Savepoint pour isoler les erreurs
+ def fetch_filter_pages(tag_filter):
+ """Récupère tous les blocks XML pour un filtre tag donné.
+ Exécuté en thread worker (pas d'accès à `db` → thread-safe)."""
+ blocks_out = []
+ local_pages = 0
+ f_last_id = None
+ while local_pages < MAX_PAGES_PER_FILTER:
+ if _refresh_cancel.is_set():
+ return blocks_out, local_pages
+ criteria = [{"field": "tagName", "operator": "CONTAINS", "value": tag_filter}]
+ if f_last_id:
+ criteria.append({"field": "id", "operator": "GREATER", "value": str(f_last_id)})
+ if mode == "diff" and last_diff_iso:
+ criteria.append({"field": "updated", "operator": "GREATER", "value": last_diff_iso})
+ payload = {"ServiceRequest": {
+ "preferences": {"limitResults": 100},
+ "filters": {"Criteria": criteria}
+ }}
+ r = None
+ for attempt in range(3):
try:
- sp = db.begin_nested()
- srv = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname)=LOWER(:h)"),
- {"h": hostname}).fetchone()
- server_id = srv.id if srv else None
- # Fallback : matching par IP si hostname ne match pas
- if not server_id and address:
- ip_match = db.execute(text(
- "SELECT s.id FROM servers s JOIN server_ips si ON si.server_id=s.id "
- "WHERE si.ip_address = CAST(:ip AS inet) LIMIT 1"
- ), {"ip": address}).fetchone()
- if ip_match:
- server_id = ip_match.id
- if server_id and fqdn:
- db.execute(text("UPDATE servers SET fqdn=:fqdn WHERE id=:sid AND (fqdn IS NULL OR fqdn='')"),
- {"fqdn": fqdn, "sid": server_id})
-
- existing = db.execute(text("SELECT id, updated_at FROM qualys_assets WHERE qualys_asset_id=:qid"),
- {"qid": int(asset_id)}).fetchone()
-
- # Skip si déjà mis à jour dans les 5 dernières minutes
- if existing and existing.updated_at:
- from datetime import datetime, timezone, timedelta
- try:
- age = (datetime.now(timezone.utc) - existing.updated_at).total_seconds()
- if age < 2400:
- stats["skipped"] = stats.get("skipped", 0) + 1
- sp.commit()
- continue
- except Exception:
- pass
-
- if existing:
- db.execute(text("""UPDATE qualys_assets SET
- name=:name, hostname=:hn, fqdn=:fqdn, ip_address=:ip, os=:os, os_family=:osf,
- agent_status=:ast, agent_version=:av, last_checkin=:lc, server_id=:sid, updated_at=now()
- WHERE qualys_asset_id=:qid"""),
- {"qid": int(asset_id), "name": name, "hn": hostname, "fqdn": fqdn or None,
- "ip": address or None, "os": os_val, "osf": os_family,
- "ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id})
- stats["updated"] += 1
- else:
- db.execute(text("""INSERT INTO qualys_assets
- (qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family,
- agent_status, agent_version, last_checkin, server_id)
- VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid)"""),
- {"qid": int(asset_id), "name": name, "hn": hostname, "fqdn": fqdn or None,
- "ip": address or None, "os": os_val, "osf": os_family,
- "ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id})
- stats["created"] += 1
- sp.commit()
+ r = sess.post(
+ f"{qualys_url}/qps/rest/2.0/search/am/hostasset",
+ json=payload, timeout=600)
+ break
except Exception:
- try: sp.rollback()
- except Exception: pass
- stats["errors"] += 1
+ import time as _t
+ _t.sleep(2 * (attempt + 1))
+ if r is None or r.status_code != 200 or "SUCCESS" not in r.text:
+ break
+ local_pages += 1
+
+ page_blocks = r.text.split("")[1:]
+ if not page_blocks:
+ break
+
+ new_last_id = f_last_id
+ for block in page_blocks:
+ block_end = block.split("")[0]
+ blocks_out.append(block_end)
+ m = re.search(r"(\d+)", block_end)
+ if m:
+ aid = int(m.group(1))
+ if new_last_id is None or aid > new_last_id:
+ new_last_id = aid
+
+ if "true" not in r.text or new_last_id == f_last_id:
+ break
+ f_last_id = new_last_id
+ return blocks_out, local_pages
+
+ # --- OPTIM 3 : 5 filtres en parallèle ---------------------------------
+ tag_filters = ["SRV", "server", "SED", "SEI", "EMV"]
+ all_blocks = []
+ seen_asset_ids = set()
+
+ with ThreadPoolExecutor(max_workers=len(tag_filters)) as executor:
+ futures = {executor.submit(fetch_filter_pages, tf): tf for tf in tag_filters}
+ for future in as_completed(futures):
+ try:
+ blocks, page_count = future.result()
+ stats["pages"] += page_count
+ # Dédup par asset_id (un asset avec tags SRV + server apparait 2 fois)
+ for b in blocks:
+ m = re.search(r"(\d+)", b)
+ if m:
+ aid = int(m.group(1))
+ if aid not in seen_asset_ids:
+ seen_asset_ids.add(aid)
+ all_blocks.append(b)
except Exception:
stats["errors"] += 1
- db.commit()
+ if _refresh_cancel.is_set():
+ stats["ok"] = True
+ stats["cancelled"] = True
+ stats["msg"] = f"Annulé apres fetch ({stats['pages']} pages, {len(all_blocks)} assets uniques)"
+ return stats
- # Filtre actuel: page suivante OU passe au prochain filtre
- if "true" not in r.text or new_last_id == last_id:
- current_filter_idx += 1
- last_id = None
- else:
- last_id = new_last_id
+ # --- OPTIM 4 : UPSERT INSERT ... ON CONFLICT DO UPDATE -----------------
+ # RETURNING (xmax = 0) AS inserted : true = INSERT, false = UPDATE
+ # (xmax=0 sur une ligne fraichement insérée, != 0 quand update)
+ UPSERT_SQL = text("""
+ INSERT INTO qualys_assets
+ (qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family,
+ agent_status, agent_version, last_checkin, server_id)
+ VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid)
+ ON CONFLICT (qualys_asset_id) DO UPDATE SET
+ name = EXCLUDED.name,
+ hostname = EXCLUDED.hostname,
+ fqdn = EXCLUDED.fqdn,
+ ip_address = EXCLUDED.ip_address,
+ os = EXCLUDED.os,
+ os_family = EXCLUDED.os_family,
+ agent_status = EXCLUDED.agent_status,
+ agent_version = EXCLUDED.agent_version,
+ last_checkin = EXCLUDED.last_checkin,
+ server_id = EXCLUDED.server_id,
+ updated_at = now()
+ RETURNING (xmax = 0) AS inserted
+ """)
+
+ FQDN_UPDATE_SQL = text(
+ "UPDATE servers SET fqdn=:fqdn WHERE id=:sid AND (fqdn IS NULL OR fqdn='')"
+ )
+
+ def _valid(h):
+ if not h:
+ return False
+ h = h.strip().lower()
+ if h in ("localhost", ""):
+ return False
+ if re.match(r"^\d+\.\d+\.\d+\.\d+$", h):
+ return False
+ return True
+
+ # Traitement séquentiel (session DB non thread-safe, mais plus de 2 queries/asset)
+ for block in all_blocks:
+ if _refresh_cancel.is_set():
+ break
+ try:
+ asset_id = (parse_xml(block, "id") or [""])[0]
+ if not asset_id or not asset_id.isdigit():
+ continue
+ name = (parse_xml(block, "name") or [""])[0]
+ address = (parse_xml(block, "address") or [""])[0]
+ fqdn = (parse_xml(block, "fqdn") or [""])[0]
+ netbios = (parse_xml(block, "netbiosName") or [""])[0]
+ os_val = (parse_xml(block, "os") or [""])[0]
+ hostname_src = name if _valid(name.split(".")[0]) else (fqdn if _valid(fqdn.split(".")[0]) else netbios)
+ hostname = hostname_src.split(".")[0].lower() if hostname_src else ""
+
+ agent_status = ""
+ agent_version = ""
+ last_checkin = None
+ if "" in block:
+ ab = block[block.find(""):block.find("")]
+ agent_status = (parse_xml(ab, "status") or [""])[0]
+ agent_version = (parse_xml(ab, "agentVersion") or [""])[0]
+ lc = (parse_xml(ab, "lastCheckedIn") or [""])[0]
+ last_checkin = lc if lc else None
+
+ os_family = None
+ if any(k in os_val.lower() for k in ("linux", "red hat", "centos", "debian")):
+ os_family = "linux"
+ elif "windows" in os_val.lower():
+ os_family = "windows"
+
+ # Lookup O(1) au lieu de 2 queries SQL
+ server_id = servers_by_hostname.get(hostname) if hostname else None
+ if not server_id and address:
+ server_id = servers_by_ip.get(address)
+
+ # FQDN backfill ciblé (seulement si le server en a besoin)
+ if server_id and fqdn and server_id in servers_need_fqdn:
+ try:
+ db.execute(FQDN_UPDATE_SQL, {"fqdn": fqdn, "sid": server_id})
+ servers_need_fqdn.discard(server_id)
+ except Exception:
+ pass
+
+ result = db.execute(UPSERT_SQL, {
+ "qid": int(asset_id), "name": name, "hn": hostname,
+ "fqdn": fqdn or None, "ip": address or None,
+ "os": os_val, "osf": os_family, "ast": agent_status,
+ "av": agent_version, "lc": last_checkin, "sid": server_id,
+ })
+ row = result.fetchone()
+ if row and row.inserted:
+ stats["created"] += 1
+ else:
+ stats["updated"] += 1
+ except Exception:
+ stats["errors"] += 1
+
+ # Commit final unique
+ try:
+ db.commit()
+ except Exception:
+ try: db.rollback()
+ except Exception: pass
stats["ok"] = True
stats["mode"] = mode
- stats["msg"] = f"[{mode}] {stats['created']} créés, {stats['updated']} mis à jour ({stats['pages']} pages, {stats['errors']} erreurs, {len(tag_filters)} filtres)"
- # Memorise le timestamp pour le prochain diff sync (format sans microsec)
+ stats["msg"] = (f"[{mode}] {stats['created']} créés, {stats['updated']} mis à jour "
+ f"({stats['pages']} pages // {len(tag_filters)} filtres, "
+ f"{len(all_blocks)} assets uniques, {stats['errors']} erreurs)")
if mode == "diff":
try:
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")