"""Service Qualys — sync tags pour un serveur via API + cache memoire""" import re import requests import threading _refresh_lock = threading.Lock() _refresh_running = False _refresh_cancel = threading.Event() import urllib3 from sqlalchemy import text from .secrets_service import get_secret from . import cache as _cache urllib3.disable_warnings() CACHE_TTL = 600 # 10 minutes def _get_qualys_creds(db): """Recupere les credentials Qualys depuis les secrets chiffres""" url = get_secret(db, "qualys_url") or "https://qualysapi.qualys.eu" user = get_secret(db, "qualys_user") or "" pwd = get_secret(db, "qualys_pass") or "" proxy = get_secret(db, "qualys_proxy") or "" bypass = (get_secret(db, "qualys_bypass_proxy") or "").lower() == "true" if bypass: proxy = "" return url, user, pwd, proxy def parse_xml(txt, tag): return re.findall(f"<{tag}>([^<]*)", txt) def search_assets_api(db, query, field="name", operator="CONTAINS", force_refresh=False): """Recherche des assets via l'API Qualys — cache 10 min""" cache_key = f"qualys:search:{field}:{query}" if not force_refresh: cached = _cache.get(cache_key) if cached is not None: cached["msg"] = cached.get("msg", "") + " (cache)" cached["from_cache"] = True return cached qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return {"ok": False, "msg": "Credentials Qualys non configurés", "assets": []} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None try: r = requests.post( f"{qualys_url}/qps/rest/2.0/search/am/hostasset", json={"ServiceRequest": { "preferences": {"limitResults": 200}, "filters": {"Criteria": [ {"field": field, "operator": operator, "value": query} ]} }}, auth=(qualys_user, qualys_pass), verify=False, timeout=60, proxies=proxies, headers={"Content-Type": "application/json"} ) except Exception as e: return {"ok": False, "msg": f"Erreur API: {e}", "assets": []} if r.status_code != 200 or "SUCCESS" not in r.text: return {"ok": False, "msg": f"API HTTP {r.status_code}", "assets": []} assets = _parse_assets_full(r.text) result = {"ok": True, "msg": f"{len(assets)} résultat(s)", "assets": assets, "from_cache": False} _cache.set(cache_key, result, CACHE_TTL) return result def get_all_tags_api(db): """Récupère tous les tags depuis l'API Qualys""" qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return {"ok": False, "msg": "Credentials non configurés", "tags": []} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None try: r = requests.post( f"{qualys_url}/qps/rest/2.0/search/am/tag", json={"ServiceRequest": {"preferences": {"limitResults": 1000}}}, auth=(qualys_user, qualys_pass), verify=False, timeout=60, proxies=proxies, headers={"Content-Type": "application/json"} ) except Exception as e: return {"ok": False, "msg": str(e), "tags": []} if r.status_code != 200 or "SUCCESS" not in r.text: return {"ok": False, "msg": f"HTTP {r.status_code}", "tags": []} tags = [] for block in r.text.split("")[1:]: block = block.split("")[0] tid = (parse_xml(block, "id") or [""])[0] tname = (parse_xml(block, "name") or [""])[0] rule_type = (parse_xml(block, "ruleType") or [""])[0] if tid and tname: tags.append({"id": int(tid), "name": tname, "is_dynamic": bool(rule_type), "rule_type": rule_type}) return {"ok": True, "msg": f"{len(tags)} tags", "tags": tags} def _parse_assets_full(text): """Parse le XML Qualys en liste de dicts enrichis""" assets = [] for block in text.split("")[1:]: block = block.split("")[0] aid = (parse_xml(block, "id") or [""])[0] name = (parse_xml(block, "name") or [""])[0] fqdn = (parse_xml(block, "fqdn") or [""])[0] address = (parse_xml(block, "address") or [""])[0] os_val = (parse_xml(block, "os") or [""])[0] agent_status = "" agent_version = "" last_checkin = "" if "" in block: agent_status = (parse_xml(block, "status") or [""])[0] agent_version = (parse_xml(block, "agentVersion") or [""])[0] last_checkin = (parse_xml(block, "lastCheckedIn") or [""])[0] # Tags tags = [] if "" in block: tag_block = block.split("")[1].split("")[0] tag_names = parse_xml(tag_block, "name") tags = tag_names netbios = (parse_xml(block, "netbiosName") or [""])[0] hostname_src = fqdn or netbios or name hostname = hostname_src.split(".")[0].lower() if hostname_src else "" assets.append({ "qualys_asset_id": int(aid) if aid else None, "name": name, "hostname": hostname, "fqdn": fqdn, "ip_address": address, "os": os_val, "agent_status": agent_status, "agent_version": agent_version, "last_checkin": last_checkin, "tags": tags, "tags_list": ", ".join(tags), }) return assets def create_tag_api(db, tag_name): """Crée un tag statique dans Qualys via API""" qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return {"ok": False, "msg": "Credentials non configurés"} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None try: r = requests.post( f"{qualys_url}/qps/rest/2.0/create/am/tag", json={"ServiceRequest": {"data": {"Tag": {"name": tag_name}}}}, auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies, headers={"Content-Type": "application/json"}) if r.status_code == 200 and "SUCCESS" in r.text: tid = (parse_xml(r.text, "id") or [""])[0] if tid: db.execute(text(""" INSERT INTO qualys_tags (qualys_tag_id, name, is_dynamic) VALUES (:tid, :n, false) ON CONFLICT (qualys_tag_id) DO UPDATE SET name = EXCLUDED.name """), {"tid": int(tid), "n": tag_name}) db.commit() return {"ok": True, "msg": f"Tag '{tag_name}' créé (ID: {tid})"} return {"ok": False, "msg": f"Erreur API: {r.text[:200]}"} except Exception as e: return {"ok": False, "msg": str(e)} def delete_tag_api(db, qualys_tag_id): """Supprime un tag dans Qualys via API""" qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return {"ok": False, "msg": "Credentials non configurés"} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None try: r = requests.post( f"{qualys_url}/qps/rest/2.0/delete/am/tag/{qualys_tag_id}", auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies, headers={"Content-Type": "application/json"}) if r.status_code == 200 and "SUCCESS" in r.text: db.execute(text("DELETE FROM qualys_asset_tags WHERE qualys_tag_id = :tid"), {"tid": qualys_tag_id}) db.execute(text("DELETE FROM qualys_tags WHERE qualys_tag_id = :tid"), {"tid": qualys_tag_id}) db.commit() return {"ok": True, "msg": "Tag supprimé"} return {"ok": False, "msg": f"Erreur API: {r.text[:200]}"} except Exception as e: return {"ok": False, "msg": str(e)} def add_tag_to_asset_api(db, asset_id, tag_id): """Ajoute un tag à un asset via API Qualys""" qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return {"ok": False, "msg": "Credentials non configurés"} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None try: r = requests.post( f"{qualys_url}/qps/rest/2.0/update/am/hostasset/{asset_id}", json={"ServiceRequest": {"data": {"HostAsset": {"tags": {"add": {"TagSimple": {"id": tag_id}}}}}}}, auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies, headers={"Content-Type": "application/json"}) if r.status_code == 200 and "SUCCESS" in r.text: db.execute(text(""" INSERT INTO qualys_asset_tags (qualys_asset_id, qualys_tag_id) VALUES (:aid, :tid) ON CONFLICT DO NOTHING """), {"aid": asset_id, "tid": tag_id}) db.commit() return {"ok": True, "msg": "Tag ajouté"} return {"ok": False, "msg": f"Erreur: {r.text[:200]}"} except Exception as e: return {"ok": False, "msg": str(e)} def remove_tag_from_asset_api(db, asset_id, tag_id): """Retire un tag d'un asset via API Qualys""" qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return {"ok": False, "msg": "Credentials non configurés"} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None try: r = requests.post( f"{qualys_url}/qps/rest/2.0/update/am/hostasset/{asset_id}", json={"ServiceRequest": {"data": {"HostAsset": {"tags": {"remove": {"TagSimple": {"id": tag_id}}}}}}}, auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies, headers={"Content-Type": "application/json"}) if r.status_code == 200 and "SUCCESS" in r.text: db.execute(text(""" DELETE FROM qualys_asset_tags WHERE qualys_asset_id = :aid AND qualys_tag_id = :tid """), {"aid": asset_id, "tid": tag_id}) db.commit() return {"ok": True, "msg": "Tag retiré"} return {"ok": False, "msg": f"Erreur: {r.text[:200]}"} except Exception as e: return {"ok": False, "msg": str(e)} def resync_all_tags(db): """Resync tous les tags depuis l'API Qualys vers la base locale""" result = get_all_tags_api(db) if not result["ok"]: return result count = 0 for t in result["tags"]: db.execute(text(""" INSERT INTO qualys_tags (qualys_tag_id, name, is_dynamic, rule_type) VALUES (:tid, :n, :dyn, :rt) ON CONFLICT (qualys_tag_id) DO UPDATE SET name = EXCLUDED.name, is_dynamic = EXCLUDED.is_dynamic, rule_type = EXCLUDED.rule_type, updated_at = now() """), {"tid": t["id"], "n": t["name"], "dyn": t["is_dynamic"], "rt": t.get("rule_type")}) count += 1 db.commit() return {"ok": True, "msg": f"{count} tags synchronisés"} def sync_server_qualys(db, server_id): """Sync les tags Qualys pour un serveur donne. Retourne un dict resultat.""" row = db.execute(text( "SELECT hostname, qualys_asset_id FROM servers WHERE id = :id" ), {"id": server_id}).fetchone() if not row: return {"ok": False, "msg": "Serveur introuvable"} hostname = row.hostname qid = row.qualys_asset_id qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return {"ok": False, "msg": "Credentials Qualys non configures (Settings)"} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None # Chercher l'asset par hostname si pas de qualys_asset_id if not qid: qid = _find_asset_by_hostname(qualys_url, qualys_user, qualys_pass, hostname, proxies) if not qid: return {"ok": False, "msg": f"Asset '{hostname}' non trouve dans Qualys"} db.execute(text("UPDATE servers SET qualys_asset_id = :qid WHERE id = :id"), {"qid": qid, "id": server_id}) # Recuperer l'asset complet avec tags try: r = requests.post( f"{qualys_url}/qps/rest/2.0/search/am/hostasset", json={"ServiceRequest": { "filters": {"Criteria": [ {"field": "id", "operator": "EQUALS", "value": str(qid)} ]} }}, auth=(qualys_user, qualys_pass), verify=False, timeout=60, proxies=proxies, headers={"Content-Type": "application/json"} ) except Exception as e: return {"ok": False, "msg": f"Erreur API: {e}"} if r.status_code != 200 or "SUCCESS" not in r.text: return {"ok": False, "msg": f"API HTTP {r.status_code}"} # Parser asset blocks = r.text.split("") if len(blocks) < 2: return {"ok": False, "msg": "Asset non trouve dans la reponse"} block = blocks[1].split("")[0] fqdn = (parse_xml(block, "fqdn") or [""])[0] address = (parse_xml(block, "address") or [""])[0] os_val = (parse_xml(block, "os") or [""])[0] agent_status = (parse_xml(block, "status") or [""])[0] if "" in block else "" agent_version = (parse_xml(block, "agentVersion") or [""])[0] last_checkin = (parse_xml(block, "lastCheckedIn") or [""])[0] or None os_family = None os_low = os_val.lower() if any(k in os_low for k in ("linux", "red hat", "centos", "debian")): os_family = "linux" elif "windows" in os_low: os_family = "windows" # Update qualys_assets db.execute(text(""" INSERT INTO qualys_assets (qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family, agent_status, agent_version, last_checkin, server_id) VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid) ON CONFLICT (qualys_asset_id) DO UPDATE SET fqdn=EXCLUDED.fqdn, ip_address=EXCLUDED.ip_address, os=EXCLUDED.os, os_family=EXCLUDED.os_family, agent_status=EXCLUDED.agent_status, agent_version=EXCLUDED.agent_version, last_checkin=EXCLUDED.last_checkin, updated_at=now() """), {"qid": qid, "name": hostname, "hn": hostname.split(".")[0].lower(), "fqdn": fqdn or None, "ip": address or None, "os": os_val, "osf": os_family, "ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id}) # Enrichir servers db.execute(text(""" UPDATE servers SET fqdn = COALESCE(NULLIF(:fqdn, ''), fqdn), os_version = COALESCE(NULLIF(:os, ''), os_version) WHERE id = :id """), {"fqdn": fqdn, "os": os_val, "id": server_id}) # Tags tag_count = 0 if "" in block: tag_block = block.split("")[1].split("")[0] tag_ids = parse_xml(tag_block, "id") tag_names = parse_xml(tag_block, "name") # Supprimer anciens liens db.execute(text("DELETE FROM qualys_asset_tags WHERE qualys_asset_id = :qid"), {"qid": qid}) for tid, tname in zip(tag_ids, tag_names): # Upsert tag db.execute(text(""" INSERT INTO qualys_tags (qualys_tag_id, name) VALUES (:tid, :tn) ON CONFLICT (qualys_tag_id) DO UPDATE SET name=EXCLUDED.name, updated_at=now() """), {"tid": int(tid), "tn": tname}) # Lien asset-tag db.execute(text(""" INSERT INTO qualys_asset_tags (qualys_asset_id, qualys_tag_id) VALUES (:qid, :tid) ON CONFLICT DO NOTHING """), {"qid": qid, "tid": int(tid)}) tag_count += 1 db.commit() return {"ok": True, "msg": f"Synchro OK — {tag_count} tags", "tags": tag_count} def _find_asset_by_hostname(qualys_url, qualys_user, qualys_pass, hostname, proxies=None): """Cherche un asset Qualys par hostname""" try: r = requests.post( f"{qualys_url}/qps/rest/2.0/search/am/hostasset", json={"ServiceRequest": { "preferences": {"limitResults": 5}, "filters": {"Criteria": [ {"field": "name", "operator": "CONTAINS", "value": hostname} ]} }}, auth=(qualys_user, qualys_pass), verify=False, timeout=60, proxies=proxies, headers={"Content-Type": "application/json"} ) if r.status_code == 200 and "SUCCESS" in r.text: ids = parse_xml(r.text, "id") if ids: return int(ids[0]) except Exception: pass return None def get_vuln_counts(db, ip_list, force_refresh=False): """Recupere le nombre de vulnerabilites actives severity 3,4,5 pour une liste d'IPs. ip_list: str (IPs separees par virgules) Retourne dict {ip: {severity3, severity4, severity5, total, confirmed, potential}} """ cache_key = f"qualys:vulns:{ip_list}" if not force_refresh: cached = _cache.get(cache_key) if cached is not None: return cached qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user or not ip_list: return {} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None try: r = requests.post( f"{qualys_url}/api/2.0/fo/asset/host/vm/detection/", data={ "action": "list", "ips": str(ip_list), "severities": "3,4,5", "status": "New,Active,Re-Opened", "show_results": "0", "output_format": "XML", }, auth=(qualys_user, qualys_pass), verify=False, timeout=300, proxies=proxies, headers={"X-Requested-With": "Python"}, ) except Exception: return {} if r.status_code != 200: return {} txt = r.text results = {} for host_block in txt.split("")[1:]: host_block = host_block.split("")[0] host_ip = (parse_xml(host_block, "IP") or [""])[0] if not host_ip: continue counts = {"severity3": 0, "severity4": 0, "severity5": 0, "total": 0, "confirmed": 0, "potential": 0} for det_block in host_block.split("")[1:]: det_block = det_block.split("")[0] severity = (parse_xml(det_block, "SEVERITY") or ["0"])[0] det_type = (parse_xml(det_block, "TYPE") or [""])[0] sev = int(severity) if severity.isdigit() else 0 if sev < 3: continue if det_type not in ("Confirmed", "Potential"): continue counts["total"] += 1 if sev == 3: counts["severity3"] += 1 elif sev == 4: counts["severity4"] += 1 elif sev == 5: counts["severity5"] += 1 if det_type == "Confirmed": counts["confirmed"] += 1 elif det_type == "Potential": counts["potential"] += 1 results[host_ip] = counts _cache.set(cache_key, results, CACHE_TTL) return results def get_activation_keys(db, force_refresh=False): """Recupere les activation keys Qualys (cache only par défaut)""" cache_key = "qualys:actkeys" cached = _cache.get(cache_key) if cached is not None: return cached if not force_refresh: # Pas de cache, pas d'appel API au chargement de page return [] qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return [] proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None try: r = requests.post( f"{qualys_url}/qps/rest/1.0/search/ca/agentactkey", json={"ServiceRequest": {"preferences": {"limitResults": 20}}}, auth=(qualys_user, qualys_pass), verify=False, timeout=5, proxies=proxies, headers={"X-Requested-With": "Python", "Content-Type": "application/json"}) except Exception: return [] if r.status_code != 200: return [] keys = [] for block in r.text.split("")[1:]: block = block.split("")[0] keys.append({ "id": (parse_xml(block, "id") or [""])[0], "key": (parse_xml(block, "activationKey") or [""])[0], "status": (parse_xml(block, "status") or [""])[0], "title": (parse_xml(block, "title") or [""])[0], "used": (parse_xml(block, "countUsed") or ["0"])[0], "type": (parse_xml(block, "type") or [""])[0], }) _cache.set(cache_key, keys, 86400) # 1h pour les keys return keys def get_agents_summary(db): """Resume des agents installes depuis les assets en base""" rows = db.execute(text(""" SELECT agent_status, COUNT(*) as cnt, COUNT(*) FILTER (WHERE agent_version IS NOT NULL AND agent_version != '') as with_version FROM qualys_assets WHERE agent_status IS NOT NULL AND agent_status != '' GROUP BY agent_status ORDER BY cnt DESC """)).fetchall() versions = db.execute(text(""" SELECT agent_version, COUNT(*) as cnt FROM qualys_assets WHERE agent_version IS NOT NULL AND agent_version != '' GROUP BY agent_version ORDER BY cnt DESC LIMIT 20 """)).fetchall() total = db.execute(text("SELECT COUNT(*) FROM qualys_assets")).scalar() active = db.execute(text("SELECT COUNT(*) FROM qualys_assets WHERE agent_status ILIKE '%%active%%' AND agent_status NOT ILIKE '%%inactive%%'")).scalar() inactive = db.execute(text("SELECT COUNT(*) FROM qualys_assets WHERE agent_status ILIKE '%%inactive%%'")).scalar() return {"statuses": rows, "versions": versions, "total_assets": total, "active": active, "inactive": inactive} def invalidate_search_cache(): """Invalide tout le cache de recherche Qualys""" _cache.clear_prefix("qualys:search:") _cache.clear_prefix("qualys:vulns:") def get_cache_stats(): """Stats du cache""" return _cache.stats() def refresh_all_agents(db, mode="diff"): """Rafraichit les agents depuis l'API Qualys QPS. mode='diff' (defaut) : ne pull que les assets dont lastCheckedIn > dernier sync diff Court (~30s), pour cron frequent. mode='full' : pull tous les assets matchant le filtre tag. Long (5-10 min), pour ménage hebdo. """ global _refresh_running if not _refresh_lock.acquire(blocking=False): return {"ok": False, "msg": "Une synchronisation Qualys est déjà en cours", "busy": True} _refresh_running = True _refresh_cancel.clear() try: return _refresh_all_agents_impl(db, mode=mode) finally: _refresh_running = False _refresh_lock.release() def _refresh_all_agents_impl(db, mode="diff"): """Implémentation réelle du refresh (appelée sous verrou). Optimisations 2026-04-16 (audit perf /qualys/agents): - Pré-chargement servers en dict Python (O(1) au lieu de 2 queries SQL par asset) - UPSERT INSERT ... ON CONFLICT DO UPDATE (une seule query au lieu de SELECT+INSERT/UPDATE) - HTTP Session réutilisée (keep-alive, évite le handshake SSL à chaque page) - 5 filtres tagName exécutés en parallèle via ThreadPoolExecutor - Dédup par asset_id (un asset couvert par plusieurs tags n'est traité qu'une fois) - Commit unique en fin de traitement (au lieu de savepoint + commit par asset) - max_pages par filtre étendu (30 → 500) pour éviter syncs incomplets silencieux """ from .secrets_service import get_secret, set_secret from datetime import datetime, timezone from concurrent.futures import ThreadPoolExecutor, as_completed # En mode diff : recupere le timestamp du dernier diff sync last_diff_iso = None if mode == "diff": last_diff_iso = get_secret(db, "qualys_last_diff_sync") total = db.execute(text("SELECT COUNT(*) FROM qualys_assets")).scalar() or 0 if total > 0 and last_diff_iso: try: last_dt = datetime.fromisoformat(last_diff_iso.replace("Z", "+00:00")) age_min = (datetime.now(timezone.utc) - last_dt).total_seconds() / 60 if age_min < 5: return {"ok": True, "msg": f"Diff sync deja effectue il y a {int(age_min)} min, rien a faire", "skipped_all": True, "mode": mode} except Exception: pass # Sauve immediatement le timestamp de DEBUT (couvre annulation + evite de rater # les updates qui se produisent pendant la sync elle-meme) if mode == "diff": try: start_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") set_secret(db, "qualys_last_diff_sync", start_iso, "Timestamp dernier sync Qualys diff (debut)") db.commit() except Exception: pass qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return {"ok": False, "msg": "Credentials Qualys non configurés"} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None stats = {"created": 0, "updated": 0, "errors": 0, "pages": 0, "skipped": 0} # flush eventuel pending try: db.commit() except Exception: pass # --- OPTIM 1 : pré-chargement des servers en RAM ----------------------- # Remplace 2 queries SQL par asset par un lookup dict O(1). servers_by_hostname = {} servers_by_ip = {} servers_need_fqdn = set() # ids des servers ayant fqdn NULL ou '' (backfill cible) try: for row in db.execute(text( "SELECT id, LOWER(hostname) AS hn, (fqdn IS NULL OR fqdn='') AS needs_fqdn FROM servers" )).fetchall(): if row.hn: servers_by_hostname[row.hn] = row.id if row.needs_fqdn: servers_need_fqdn.add(row.id) for row in db.execute(text( "SELECT s.id, si.ip_address::text AS ip FROM servers s JOIN server_ips si ON si.server_id=s.id" )).fetchall(): if row.ip: servers_by_ip[row.ip] = row.id except Exception: pass # --- OPTIM 2 : HTTP Session réutilisée --------------------------------- # Évite d'ouvrir une nouvelle connexion SSL + auth à chaque page. sess = requests.Session() sess.auth = (qualys_user, qualys_pass) sess.headers.update({"X-Requested-With": "PatchCenter", "Content-Type": "application/json"}) sess.verify = False if proxies: sess.proxies.update(proxies) MAX_PAGES_PER_FILTER = 500 # 500 x 100 = 50 000 assets/filtre (garde-fou large) def fetch_filter_pages(tag_filter): """Récupère tous les blocks XML pour un filtre tag donné. Exécuté en thread worker (pas d'accès à `db` → thread-safe).""" blocks_out = [] local_pages = 0 f_last_id = None while local_pages < MAX_PAGES_PER_FILTER: if _refresh_cancel.is_set(): return blocks_out, local_pages criteria = [{"field": "tagName", "operator": "CONTAINS", "value": tag_filter}] if f_last_id: criteria.append({"field": "id", "operator": "GREATER", "value": str(f_last_id)}) if mode == "diff" and last_diff_iso: criteria.append({"field": "updated", "operator": "GREATER", "value": last_diff_iso}) payload = {"ServiceRequest": { "preferences": {"limitResults": 100}, "filters": {"Criteria": criteria} }} r = None for attempt in range(3): try: r = sess.post( f"{qualys_url}/qps/rest/2.0/search/am/hostasset", json=payload, timeout=600) break except Exception: import time as _t _t.sleep(2 * (attempt + 1)) if r is None or r.status_code != 200 or "SUCCESS" not in r.text: break local_pages += 1 page_blocks = r.text.split("")[1:] if not page_blocks: break new_last_id = f_last_id for block in page_blocks: block_end = block.split("")[0] blocks_out.append(block_end) m = re.search(r"(\d+)", block_end) if m: aid = int(m.group(1)) if new_last_id is None or aid > new_last_id: new_last_id = aid if "true" not in r.text or new_last_id == f_last_id: break f_last_id = new_last_id return blocks_out, local_pages # --- OPTIM 3 : 5 filtres en parallèle --------------------------------- tag_filters = ["SRV", "server", "SED", "SEI", "EMV"] all_blocks = [] seen_asset_ids = set() with ThreadPoolExecutor(max_workers=len(tag_filters)) as executor: futures = {executor.submit(fetch_filter_pages, tf): tf for tf in tag_filters} for future in as_completed(futures): try: blocks, page_count = future.result() stats["pages"] += page_count # Dédup par asset_id (un asset avec tags SRV + server apparait 2 fois) for b in blocks: m = re.search(r"(\d+)", b) if m: aid = int(m.group(1)) if aid not in seen_asset_ids: seen_asset_ids.add(aid) all_blocks.append(b) except Exception: stats["errors"] += 1 if _refresh_cancel.is_set(): stats["ok"] = True stats["cancelled"] = True stats["msg"] = f"Annulé apres fetch ({stats['pages']} pages, {len(all_blocks)} assets uniques)" return stats # --- OPTIM 4 : UPSERT INSERT ... ON CONFLICT DO UPDATE ----------------- # RETURNING (xmax = 0) AS inserted : true = INSERT, false = UPDATE # (xmax=0 sur une ligne fraichement insérée, != 0 quand update) UPSERT_SQL = text(""" INSERT INTO qualys_assets (qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family, agent_status, agent_version, last_checkin, server_id) VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid) ON CONFLICT (qualys_asset_id) DO UPDATE SET name = EXCLUDED.name, hostname = EXCLUDED.hostname, fqdn = EXCLUDED.fqdn, ip_address = EXCLUDED.ip_address, os = EXCLUDED.os, os_family = EXCLUDED.os_family, agent_status = EXCLUDED.agent_status, agent_version = EXCLUDED.agent_version, last_checkin = EXCLUDED.last_checkin, server_id = EXCLUDED.server_id, updated_at = now() RETURNING (xmax = 0) AS inserted """) FQDN_UPDATE_SQL = text( "UPDATE servers SET fqdn=:fqdn WHERE id=:sid AND (fqdn IS NULL OR fqdn='')" ) def _valid(h): if not h: return False h = h.strip().lower() if h in ("localhost", ""): return False if re.match(r"^\d+\.\d+\.\d+\.\d+$", h): return False return True # Traitement séquentiel (session DB non thread-safe, mais plus de 2 queries/asset) for block in all_blocks: if _refresh_cancel.is_set(): break try: asset_id = (parse_xml(block, "id") or [""])[0] if not asset_id or not asset_id.isdigit(): continue name = (parse_xml(block, "name") or [""])[0] address = (parse_xml(block, "address") or [""])[0] fqdn = (parse_xml(block, "fqdn") or [""])[0] netbios = (parse_xml(block, "netbiosName") or [""])[0] os_val = (parse_xml(block, "os") or [""])[0] hostname_src = name if _valid(name.split(".")[0]) else (fqdn if _valid(fqdn.split(".")[0]) else netbios) hostname = hostname_src.split(".")[0].lower() if hostname_src else "" agent_status = "" agent_version = "" last_checkin = None if "" in block: ab = block[block.find(""):block.find("")] agent_status = (parse_xml(ab, "status") or [""])[0] agent_version = (parse_xml(ab, "agentVersion") or [""])[0] lc = (parse_xml(ab, "lastCheckedIn") or [""])[0] last_checkin = lc if lc else None os_family = None if any(k in os_val.lower() for k in ("linux", "red hat", "centos", "debian")): os_family = "linux" elif "windows" in os_val.lower(): os_family = "windows" # Lookup O(1) au lieu de 2 queries SQL server_id = servers_by_hostname.get(hostname) if hostname else None if not server_id and address: server_id = servers_by_ip.get(address) # FQDN backfill ciblé (seulement si le server en a besoin) if server_id and fqdn and server_id in servers_need_fqdn: try: db.execute(FQDN_UPDATE_SQL, {"fqdn": fqdn, "sid": server_id}) servers_need_fqdn.discard(server_id) except Exception: pass result = db.execute(UPSERT_SQL, { "qid": int(asset_id), "name": name, "hn": hostname, "fqdn": fqdn or None, "ip": address or None, "os": os_val, "osf": os_family, "ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id, }) row = result.fetchone() if row and row.inserted: stats["created"] += 1 else: stats["updated"] += 1 except Exception: stats["errors"] += 1 # Commit final unique try: db.commit() except Exception: try: db.rollback() except Exception: pass stats["ok"] = True stats["mode"] = mode stats["msg"] = (f"[{mode}] {stats['created']} créés, {stats['updated']} mis à jour " f"({stats['pages']} pages // {len(tag_filters)} filtres, " f"{len(all_blocks)} assets uniques, {stats['errors']} erreurs)") if mode == "diff": try: now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") set_secret(db, "qualys_last_diff_sync", now_iso, "Timestamp dernier sync Qualys diff") db.commit() except Exception: pass return stats def cancel_refresh(): """Demande l'annulation du refresh Qualys en cours""" _refresh_cancel.set() return {"ok": True, "msg": "Annulation demandée"} def is_refresh_running(): return _refresh_running # =========================================================================== # DASHBOARD VULNERABILITES — agregation par dimension + historique # Tables: qualys_vuln_snapshot_run (un row par calcul) + qualys_vuln_snapshot # Dimensions: global, env, pos, os, domain, env_pos # Niveaux: critical (sev5), high (sev4), medium (sev3), sain, non_scanne # Un serveur est compte dans son niveau MAX (1 seule colonne) # =========================================================================== _dashboard_running = False def is_dashboard_running(): return _dashboard_running def _classify_severity(vc): if not isinstance(vc, dict): return "sain" if vc.get("severity5", 0) > 0: return "critical" if vc.get("severity4", 0) > 0: return "high" if vc.get("severity3", 0) > 0: return "medium" return "sain" def _is_scanned(asset_row, has_vuln_data): status = (asset_row.agent_status or "").lower() if status in ("active", "ok", "status_active"): return True if has_vuln_data: return True return False def _fetch_asset_ids_by_tag(db, tag_name): """Appelle Qualys API pour recuperer la liste d'asset_ids ayant ce tag.""" qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return set() proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None try: r = requests.post( f"{qualys_url}/qps/rest/2.0/search/am/hostasset", json={"ServiceRequest": { "preferences": {"limitResults": 1000}, "filters": {"Criteria": [ {"field": "tagName", "operator": "EQUALS", "value": tag_name} ]} }}, auth=(qualys_user, qualys_pass), verify=False, timeout=120, proxies=proxies, headers={"Content-Type": "application/json"} ) if r.status_code != 200 or "SUCCESS" not in r.text: return set() ids = set() for block in r.text.split("")[1:]: block = block.split("")[0] aid = (parse_xml(block, "id") or [""])[0] if aid and aid.isdigit(): ids.add(int(aid)) return ids except Exception: return set() def compute_vuln_dashboard(db, triggered_by="manual", run_id=None): """Calcule un nouveau snapshot. Interroge l'API Qualys par tag pour avoir les vraies associations asset<->tag (la DB locale qualys_asset_tags peut etre obsolete). Si run_id fourni, l'utilise (sinon en cree un).""" import time from concurrent.futures import ThreadPoolExecutor t0 = time.time() try: # 1. Creer le run en pending si pas deja fourni if run_id is None: run_id = db.execute(text(""" INSERT INTO qualys_vuln_snapshot_run (status, triggered_by) VALUES ('pending', :tb) RETURNING id """), {"tb": triggered_by}).scalar() db.commit() # 2. Tags a interroger : ENV-*, POS-*, OS-* tag_rows = db.execute(text("""SELECT name FROM qualys_tags WHERE name ~ '^(ENV|POS|OS)-' ORDER BY name""")).fetchall() tag_names = [r.name for r in tag_rows] # 3. Pour chaque tag, appel API parallele -> {tag_name: set(asset_id)} def fetch_one(name): return name, _fetch_asset_ids_by_tag(db, name) tag_id_map = {} with ThreadPoolExecutor(max_workers=8) as ex: for name, ids in ex.map(fetch_one, tag_names): tag_id_map[name] = ids # 4. Charger tous les assets locaux (pour total + IPs + agent_status + domain_ltd) rows = db.execute(text(""" SELECT qa.qualys_asset_id, qa.hostname, qa.ip_address, qa.agent_status, qa.last_checkin, qa.os_family, qa.server_id, s.domain_ltd FROM qualys_assets qa LEFT JOIN servers s ON s.id = qa.server_id """)).fetchall() asset_count = len(rows) # 5. Recuperer vulns par batch de 50 IPs en parallele (cache 10min via get_vuln_counts) # Une session par thread pour eviter les conflits SQLAlchemy from app.database import SessionLocal ip_to_vuln = {} unique_ips = list({str(r.ip_address) for r in rows if r.ip_address and str(r.ip_address) != "None"}) batches = [unique_ips[i:i+50] for i in range(0, len(unique_ips), 50)] def _fetch_vuln_batch(batch): s = SessionLocal() try: return get_vuln_counts(s, ",".join(batch)) except Exception: return {} finally: s.close() with ThreadPoolExecutor(max_workers=8) as ex: for vmap in ex.map(_fetch_vuln_batch, batches): if vmap: ip_to_vuln.update(vmap) # 6. Classifier + agreger en utilisant les associations API agg = {} def _bump(dim, val, val2, level): key = (dim, val or "(none)", val2 or "") if key not in agg: agg[key] = {"total": 0, "critical": 0, "high": 0, "medium": 0, "sain": 0, "non_scanne": 0} agg[key]["total"] += 1 agg[key][level] += 1 for r in rows: aid = int(r.qualys_asset_id) ip = str(r.ip_address) if r.ip_address else None vc = ip_to_vuln.get(ip) if ip else None scanned = _is_scanned(r, vc is not None) level = "non_scanne" if not scanned else _classify_severity(vc or {}) envs = sorted(t for t, ids in tag_id_map.items() if t.startswith("ENV-") and aid in ids) poses = sorted(t for t, ids in tag_id_map.items() if t.startswith("POS-") and aid in ids) oses = sorted(t for t, ids in tag_id_map.items() if t.startswith("OS-") and aid in ids) dom = r.domain_ltd or "(sans domaine)" _bump("global", "all", "", level) for e in envs or ["(sans env)"]: _bump("env", e, "", level) for pos in poses or ["(sans pos)"]: _bump("pos", pos, "", level) for o in oses or ["(sans os)"]: _bump("os", o, "", level) _bump("domain", dom, "", level) for e in envs or ["(sans env)"]: for pos in poses or ["(sans pos)"]: _bump("env_pos", e, pos, level) # 7. Persister for (dim, val, val2), counts in agg.items(): db.execute(text(""" INSERT INTO qualys_vuln_snapshot (run_id, dimension, dimension_value, dimension_value2, total, critical, high, medium, sain, non_scanne) VALUES (:rid, :dim, :v1, :v2, :tot, :c, :h, :m, :s, :ns) """), {"rid": run_id, "dim": dim, "v1": val, "v2": val2 or None, "tot": counts["total"], "c": counts["critical"], "h": counts["high"], "m": counts["medium"], "s": counts["sain"], "ns": counts["non_scanne"]}) duration = int(time.time() - t0) db.execute(text("""UPDATE qualys_vuln_snapshot_run SET status='ok', asset_count=:ac, duration_sec=:d, msg=:m WHERE id=:rid"""), {"ac": asset_count, "d": duration, "m": f"OK ({len(tag_names)} tags interroges)", "rid": run_id}) db.commit() return {"ok": True, "msg": "OK", "run_id": run_id, "asset_count": asset_count, "duration_sec": duration} except Exception as ex: db.rollback() if run_id: try: db.execute(text("""UPDATE qualys_vuln_snapshot_run SET status='error', msg=:m, duration_sec=:d WHERE id=:rid"""), {"m": str(ex)[:500], "d": int(time.time() - t0), "rid": run_id}) db.commit() except Exception: pass return {"ok": False, "msg": str(ex), "run_id": run_id, "asset_count": 0, "duration_sec": int(time.time() - t0)} finally: pass def load_vuln_dashboard(db): """Charge le dernier run reussi + ses snapshots.""" last_run = db.execute(text("""SELECT id, run_at, asset_count, duration_sec, triggered_by FROM qualys_vuln_snapshot_run WHERE status='ok' ORDER BY run_at DESC LIMIT 1""")).fetchone() if not last_run: return {"global": None, "env": [], "pos": [], "os": [], "domain": [], "env_pos": [], "last_run": None} rows = db.execute(text("""SELECT dimension, dimension_value, dimension_value2, total, critical, high, medium, sain, non_scanne FROM qualys_vuln_snapshot WHERE run_id=:rid ORDER BY dimension, dimension_value, dimension_value2"""), {"rid": last_run.id}).fetchall() out = {"global": None, "env": [], "pos": [], "os": [], "domain": [], "env_pos": [], "last_run": last_run} for r in rows: d = {"name": r.dimension_value, "name2": r.dimension_value2, "total": r.total, "critical": r.critical, "high": r.high, "medium": r.medium, "sain": r.sain, "non_scanne": r.non_scanne, "vuln_total": r.critical + r.high + r.medium, "scanned": r.total - r.non_scanne} d["pct_vuln"] = round(100 * d["vuln_total"] / d["scanned"], 1) if d["scanned"] > 0 else 0 if r.dimension == "global": out["global"] = d elif r.dimension in out: out[r.dimension].append(d) return out def load_vuln_history(db, period="day", days=30, dimension="global", dimension_value="all"): """Retourne l'evolution dans le temps. period: 'day' (1 point/jour), 'week' (lundi), 'month' (1er du mois) dimension/dimension_value: filtre (defaut: global/all) Renvoie list de dicts {bucket, total, critical, high, medium, sain, non_scanne}""" if period == "month": bucket_expr = "date_trunc('month', r.run_at)" elif period == "week": bucket_expr = "date_trunc('week', r.run_at)" else: bucket_expr = "date_trunc('day', r.run_at)" rows = db.execute(text(f""" SELECT {bucket_expr} as bucket, s.total, s.critical, s.high, s.medium, s.sain, s.non_scanne, r.run_at FROM qualys_vuln_snapshot s JOIN qualys_vuln_snapshot_run r ON r.id = s.run_id WHERE r.status='ok' AND r.run_at >= now() - (:days || ' days')::interval AND s.dimension = :dim AND s.dimension_value = :dv ORDER BY r.run_at """), {"days": days, "dim": dimension, "dv": dimension_value}).fetchall() # Garder le dernier snapshot de chaque bucket by_bucket = {} for r in rows: by_bucket[r.bucket] = r return [{"bucket": b.isoformat(), "total": r.total, "critical": r.critical, "high": r.high, "medium": r.medium, "sain": r.sain, "non_scanne": r.non_scanne, "vuln_total": r.critical + r.high + r.medium} for b, r in sorted(by_bucket.items())] # =========================================================================== # DOUBLONS QUALYS — liste + suppression via API # =========================================================================== def fetch_all_qualys_assets(db, with_progress=False): """Recupere les assets Qualys SERVEURS via API (tags built-in Linux Server + Windows Server). Pagination via id GREATER, batch 1000. Filtre directement cote API pour eviter de scanner les ~5000 postes de travail (gain de temps x4). Retourne liste de dicts {id, name, ip, os, last_check, agent_status}.""" qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return [] proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None def fetch_by_tag(tag_name): results = [] last_id = 0 while True: criteria = [{"field": "tagName", "operator": "EQUALS", "value": tag_name}] if last_id: criteria.append({"field": "id", "operator": "GREATER", "value": str(last_id)}) body = {"ServiceRequest": {"preferences": {"limitResults": 1000}, "filters": {"Criteria": criteria}}} try: r = requests.post( f"{qualys_url}/qps/rest/2.0/search/am/hostasset", json=body, auth=(qualys_user, qualys_pass), verify=False, timeout=180, proxies=proxies, headers={"Content-Type": "application/json"} ) except Exception: break if r.status_code != 200 or "SUCCESS" not in r.text: break batch = [] for block in r.text.split("")[1:]: block = block.split("")[0] aid = (parse_xml(block, "id") or [""])[0] name = (parse_xml(block, "name") or [""])[0] addr = (parse_xml(block, "address") or [""])[0] os_str = (parse_xml(block, "os") or [""])[0] last_check = "" if "" in block: last_check = (parse_xml(block, "lastCheckedIn") or [""])[0] agent_status = "" if "" in block: agent_status = (parse_xml(block, "status") or [""])[0] if aid and aid.isdigit(): batch.append({"id": int(aid), "name": name, "ip": addr, "os": os_str, "last_check": last_check, "agent_status": agent_status}) if not batch: break results.extend(batch) last_id = max(b["id"] for b in batch) if len(batch) < 1000: break return results # Linux Server + Windows Server (tags built-in Qualys, couvre tous les serveurs) linux = fetch_by_tag("Linux Server") windows = fetch_by_tag("Windows Server") # Dedupe par id (un asset pourrait theoriquement avoir les 2 tags) seen = set() out = [] for a in linux + windows: if a["id"] not in seen: seen.add(a["id"]) out.append(a) return out def find_duplicate_hostnames(db, force_refresh=False): """Identifie les hostnames en doublon. Cache 10min.""" cache_key = "qualys:duplicates" if not force_refresh: cached = _cache.get(cache_key) if cached is not None: return cached from collections import defaultdict assets = fetch_all_qualys_assets(db) # Filtrer les workstations (Windows 10/11/7/8/XP, MacOS Desktop) - on veut serveurs uniquement WKS_PATTERNS = ("Windows 10", "Windows 11", "Windows 7", "Windows 8", "Windows XP", "Windows Vista", "macOS") def is_workstation(os_str): if not os_str: return False # Server est explicite, sinon les patterns workstation if "Server" in os_str: return False return any(p in os_str for p in WKS_PATTERNS) groups = defaultdict(list) for a in assets: shortname = a["name"].split(".")[0].lower() if a["name"] else "" if not shortname: continue # Filtrer les hostnames qui sont des IP (ex: "192", "10") if shortname.replace(".", "").isdigit(): continue # Filtrer les workstations if is_workstation(a.get("os", "")): continue groups[shortname].append(a) dupes = [] for shortname, items in groups.items(): if len(items) <= 1: continue # Trier par last_check desc (plus recent en premier) items.sort(key=lambda x: x["last_check"] or "", reverse=True) dupes.append({ "shortname": shortname, "count": len(items), "assets": items, "newest_check": items[0]["last_check"] or "", "oldest_check": items[-1]["last_check"] or "", }) # Trier par count desc puis shortname dupes.sort(key=lambda x: (-x["count"], x["shortname"])) result = {"total_assets": len(assets), "duplicate_hostnames": len(dupes), "total_zombies": sum(d["count"] - 1 for d in dupes), "groups": dupes} _cache.set(cache_key, result, CACHE_TTL) return result def delete_qualys_asset(db, asset_id): """Supprime un asset Qualys via API. Retourne {ok, msg}.""" qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return {"ok": False, "msg": "Pas de creds"} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None try: r = requests.post( f"{qualys_url}/qps/rest/2.0/delete/am/hostasset/{int(asset_id)}", auth=(qualys_user, qualys_pass), verify=False, timeout=60, proxies=proxies, headers={"Content-Type": "application/json"} ) if r.status_code == 200 and "SUCCESS" in r.text: # Invalider le cache pour forcer un refresh _cache.delete("qualys:duplicates") return {"ok": True, "msg": "Supprime"} return {"ok": False, "msg": f"HTTP {r.status_code}: {r.text[:300]}"} except Exception as e: return {"ok": False, "msg": str(e)}