patchcenter/app/services/qualys_service.py

1262 lines
52 KiB
Python

"""Service Qualys — sync tags pour un serveur via API + cache memoire"""
import re
import requests
import threading
_refresh_lock = threading.Lock()
_refresh_running = False
_refresh_cancel = threading.Event()
import urllib3
from sqlalchemy import text
from .secrets_service import get_secret
from . import cache as _cache
urllib3.disable_warnings()
CACHE_TTL = 600 # 10 minutes
def _get_qualys_creds(db):
"""Recupere les credentials Qualys depuis les secrets chiffres"""
url = get_secret(db, "qualys_url") or "https://qualysapi.qualys.eu"
user = get_secret(db, "qualys_user") or ""
pwd = get_secret(db, "qualys_pass") or ""
proxy = get_secret(db, "qualys_proxy") or ""
bypass = (get_secret(db, "qualys_bypass_proxy") or "").lower() == "true"
if bypass:
proxy = ""
return url, user, pwd, proxy
def parse_xml(txt, tag):
return re.findall(f"<{tag}>([^<]*)</{tag}>", txt)
def search_assets_api(db, query, field="name", operator="CONTAINS", force_refresh=False):
"""Recherche des assets via l'API Qualys — cache 10 min"""
cache_key = f"qualys:search:{field}:{query}"
if not force_refresh:
cached = _cache.get(cache_key)
if cached is not None:
cached["msg"] = cached.get("msg", "") + " (cache)"
cached["from_cache"] = True
return cached
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials Qualys non configurés", "assets": []}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/5.0/search/am/hostasset",
json={"ServiceRequest": {
"preferences": {"limitResults": 200},
"filters": {"Criteria": [
{"field": field, "operator": operator, "value": query}
]}
}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=60, proxies=proxies,
headers={"Content-Type": "application/json"}
)
except Exception as e:
return {"ok": False, "msg": f"Erreur API: {e}", "assets": []}
if r.status_code != 200 or "SUCCESS" not in r.text:
return {"ok": False, "msg": f"API HTTP {r.status_code}", "assets": []}
assets = _parse_assets_full(r.text)
result = {"ok": True, "msg": f"{len(assets)} résultat(s)", "assets": assets, "from_cache": False}
_cache.set(cache_key, result, CACHE_TTL)
return result
def get_all_tags_api(db):
"""Récupère tous les tags depuis l'API Qualys"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés", "tags": []}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/5.0/search/am/tag",
json={"ServiceRequest": {"preferences": {"limitResults": 1000}}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=60, proxies=proxies,
headers={"Content-Type": "application/json"}
)
except Exception as e:
return {"ok": False, "msg": str(e), "tags": []}
if r.status_code != 200 or "SUCCESS" not in r.text:
return {"ok": False, "msg": f"HTTP {r.status_code}", "tags": []}
tags = []
for block in r.text.split("<Tag>")[1:]:
block = block.split("</Tag>")[0]
tid = (parse_xml(block, "id") or [""])[0]
tname = (parse_xml(block, "name") or [""])[0]
rule_type = (parse_xml(block, "ruleType") or [""])[0]
if tid and tname:
tags.append({"id": int(tid), "name": tname, "is_dynamic": bool(rule_type), "rule_type": rule_type})
return {"ok": True, "msg": f"{len(tags)} tags", "tags": tags}
def _parse_assets_full(text):
"""Parse le XML Qualys en liste de dicts enrichis"""
assets = []
for block in text.split("<HostAsset>")[1:]:
block = block.split("</HostAsset>")[0]
aid = (parse_xml(block, "id") or [""])[0]
name = (parse_xml(block, "name") or [""])[0]
fqdn = (parse_xml(block, "fqdn") or [""])[0]
address = (parse_xml(block, "address") or [""])[0]
os_val = (parse_xml(block, "os") or [""])[0]
agent_status = ""
agent_version = ""
last_checkin = ""
if "<agentInfo>" in block:
agent_status = (parse_xml(block, "status") or [""])[0]
agent_version = (parse_xml(block, "agentVersion") or [""])[0]
last_checkin = (parse_xml(block, "lastCheckedIn") or [""])[0]
# Tags
tags = []
if "<tags>" in block:
tag_block = block.split("<tags>")[1].split("</tags>")[0]
tag_names = parse_xml(tag_block, "name")
tags = tag_names
netbios = (parse_xml(block, "netbiosName") or [""])[0]
hostname_src = fqdn or netbios or name
hostname = hostname_src.split(".")[0].lower() if hostname_src else ""
assets.append({
"qualys_asset_id": int(aid) if aid else None,
"name": name, "hostname": hostname, "fqdn": fqdn,
"ip_address": address, "os": os_val,
"agent_status": agent_status, "agent_version": agent_version,
"last_checkin": last_checkin, "tags": tags,
"tags_list": ", ".join(tags),
})
return assets
def create_tag_api(db, tag_name):
"""Crée un tag statique dans Qualys via API"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/5.0/create/am/tag",
json={"ServiceRequest": {"data": {"Tag": {"name": tag_name}}}},
auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies,
headers={"Content-Type": "application/json"})
if r.status_code == 200 and "SUCCESS" in r.text:
tid = (parse_xml(r.text, "id") or [""])[0]
if tid:
db.execute(text("""
INSERT INTO qualys_tags (qualys_tag_id, name, is_dynamic) VALUES (:tid, :n, false)
ON CONFLICT (qualys_tag_id) DO UPDATE SET name = EXCLUDED.name
"""), {"tid": int(tid), "n": tag_name})
db.commit()
return {"ok": True, "msg": f"Tag '{tag_name}' créé (ID: {tid})"}
return {"ok": False, "msg": f"Erreur API: {r.text[:200]}"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def delete_tag_api(db, qualys_tag_id):
"""Supprime un tag dans Qualys via API"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/5.0/delete/am/tag/{qualys_tag_id}",
auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies,
headers={"Content-Type": "application/json"})
if r.status_code == 200 and "SUCCESS" in r.text:
db.execute(text("DELETE FROM qualys_asset_tags WHERE qualys_tag_id = :tid"), {"tid": qualys_tag_id})
db.execute(text("DELETE FROM qualys_tags WHERE qualys_tag_id = :tid"), {"tid": qualys_tag_id})
db.commit()
return {"ok": True, "msg": "Tag supprimé"}
return {"ok": False, "msg": f"Erreur API: {r.text[:200]}"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def add_tag_to_asset_api(db, asset_id, tag_id):
"""Ajoute un tag à un asset via API Qualys"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/5.0/update/am/hostasset/{asset_id}",
json={"ServiceRequest": {"data": {"HostAsset": {"tags": {"add": {"TagSimple": {"id": tag_id}}}}}}},
auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies,
headers={"Content-Type": "application/json"})
if r.status_code == 200 and "SUCCESS" in r.text:
db.execute(text("""
INSERT INTO qualys_asset_tags (qualys_asset_id, qualys_tag_id)
VALUES (:aid, :tid) ON CONFLICT DO NOTHING
"""), {"aid": asset_id, "tid": tag_id})
db.commit()
return {"ok": True, "msg": "Tag ajouté"}
return {"ok": False, "msg": f"Erreur: {r.text[:200]}"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def remove_tag_from_asset_api(db, asset_id, tag_id):
"""Retire un tag d'un asset via API Qualys"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/5.0/update/am/hostasset/{asset_id}",
json={"ServiceRequest": {"data": {"HostAsset": {"tags": {"remove": {"TagSimple": {"id": tag_id}}}}}}},
auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies,
headers={"Content-Type": "application/json"})
if r.status_code == 200 and "SUCCESS" in r.text:
db.execute(text("""
DELETE FROM qualys_asset_tags WHERE qualys_asset_id = :aid AND qualys_tag_id = :tid
"""), {"aid": asset_id, "tid": tag_id})
db.commit()
return {"ok": True, "msg": "Tag retiré"}
return {"ok": False, "msg": f"Erreur: {r.text[:200]}"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def resync_all_tags(db):
"""Resync tous les tags depuis l'API Qualys vers la base locale"""
result = get_all_tags_api(db)
if not result["ok"]:
return result
count = 0
for t in result["tags"]:
db.execute(text("""
INSERT INTO qualys_tags (qualys_tag_id, name, is_dynamic, rule_type)
VALUES (:tid, :n, :dyn, :rt)
ON CONFLICT (qualys_tag_id) DO UPDATE SET name = EXCLUDED.name, is_dynamic = EXCLUDED.is_dynamic,
rule_type = EXCLUDED.rule_type, updated_at = now()
"""), {"tid": t["id"], "n": t["name"], "dyn": t["is_dynamic"], "rt": t.get("rule_type")})
count += 1
db.commit()
return {"ok": True, "msg": f"{count} tags synchronisés"}
def sync_server_qualys(db, server_id):
"""Sync les tags Qualys pour un serveur donne. Retourne un dict resultat."""
row = db.execute(text(
"SELECT hostname, qualys_asset_id FROM servers WHERE id = :id"
), {"id": server_id}).fetchone()
if not row:
return {"ok": False, "msg": "Serveur introuvable"}
hostname = row.hostname
qid = row.qualys_asset_id
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials Qualys non configures (Settings)"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
# Chercher l'asset par hostname si pas de qualys_asset_id
if not qid:
qid = _find_asset_by_hostname(qualys_url, qualys_user, qualys_pass, hostname, proxies)
if not qid:
return {"ok": False, "msg": f"Asset '{hostname}' non trouve dans Qualys"}
db.execute(text("UPDATE servers SET qualys_asset_id = :qid WHERE id = :id"),
{"qid": qid, "id": server_id})
# Recuperer l'asset complet avec tags
try:
r = requests.post(
f"{qualys_url}/qps/rest/5.0/search/am/hostasset",
json={"ServiceRequest": {
"filters": {"Criteria": [
{"field": "id", "operator": "EQUALS", "value": str(qid)}
]}
}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=60, proxies=proxies,
headers={"Content-Type": "application/json"}
)
except Exception as e:
return {"ok": False, "msg": f"Erreur API: {e}"}
if r.status_code != 200 or "SUCCESS" not in r.text:
return {"ok": False, "msg": f"API HTTP {r.status_code}"}
# Parser asset
blocks = r.text.split("<HostAsset>")
if len(blocks) < 2:
return {"ok": False, "msg": "Asset non trouve dans la reponse"}
block = blocks[1].split("</HostAsset>")[0]
fqdn = (parse_xml(block, "fqdn") or [""])[0]
address = (parse_xml(block, "address") or [""])[0]
os_val = (parse_xml(block, "os") or [""])[0]
agent_status = (parse_xml(block, "status") or [""])[0] if "<agentInfo>" in block else ""
agent_version = (parse_xml(block, "agentVersion") or [""])[0]
last_checkin = (parse_xml(block, "lastCheckedIn") or [""])[0] or None
os_family = None
os_low = os_val.lower()
if any(k in os_low for k in ("linux", "red hat", "centos", "debian")):
os_family = "linux"
elif "windows" in os_low:
os_family = "windows"
# Update qualys_assets
db.execute(text("""
INSERT INTO qualys_assets (qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family,
agent_status, agent_version, last_checkin, server_id)
VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid)
ON CONFLICT (qualys_asset_id) DO UPDATE SET
fqdn=EXCLUDED.fqdn, ip_address=EXCLUDED.ip_address, os=EXCLUDED.os,
os_family=EXCLUDED.os_family, agent_status=EXCLUDED.agent_status,
agent_version=EXCLUDED.agent_version, last_checkin=EXCLUDED.last_checkin, updated_at=now()
"""), {"qid": qid, "name": hostname, "hn": hostname.split(".")[0].lower(),
"fqdn": fqdn or None, "ip": address or None, "os": os_val, "osf": os_family,
"ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id})
# Enrichir servers
db.execute(text("""
UPDATE servers SET
fqdn = COALESCE(NULLIF(:fqdn, ''), fqdn),
os_version = COALESCE(NULLIF(:os, ''), os_version)
WHERE id = :id
"""), {"fqdn": fqdn, "os": os_val, "id": server_id})
# Tags
tag_count = 0
if "<tags>" in block:
tag_block = block.split("<tags>")[1].split("</tags>")[0]
tag_ids = parse_xml(tag_block, "id")
tag_names = parse_xml(tag_block, "name")
# Supprimer anciens liens
db.execute(text("DELETE FROM qualys_asset_tags WHERE qualys_asset_id = :qid"), {"qid": qid})
for tid, tname in zip(tag_ids, tag_names):
# Upsert tag
db.execute(text("""
INSERT INTO qualys_tags (qualys_tag_id, name) VALUES (:tid, :tn)
ON CONFLICT (qualys_tag_id) DO UPDATE SET name=EXCLUDED.name, updated_at=now()
"""), {"tid": int(tid), "tn": tname})
# Lien asset-tag
db.execute(text("""
INSERT INTO qualys_asset_tags (qualys_asset_id, qualys_tag_id)
VALUES (:qid, :tid) ON CONFLICT DO NOTHING
"""), {"qid": qid, "tid": int(tid)})
tag_count += 1
db.commit()
return {"ok": True, "msg": f"Synchro OK — {tag_count} tags", "tags": tag_count}
def _find_asset_by_hostname(qualys_url, qualys_user, qualys_pass, hostname, proxies=None):
"""Cherche un asset Qualys par hostname"""
try:
r = requests.post(
f"{qualys_url}/qps/rest/5.0/search/am/hostasset",
json={"ServiceRequest": {
"preferences": {"limitResults": 5},
"filters": {"Criteria": [
{"field": "name", "operator": "CONTAINS", "value": hostname}
]}
}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=60, proxies=proxies,
headers={"Content-Type": "application/json"}
)
if r.status_code == 200 and "SUCCESS" in r.text:
ids = parse_xml(r.text, "id")
if ids:
return int(ids[0])
except Exception:
pass
return None
def get_vuln_counts(db, ip_list, force_refresh=False):
"""Recupere le nombre de vulnerabilites actives severity 3,4,5 pour une liste d'IPs.
ip_list: str (IPs separees par virgules)
Retourne dict {ip: {severity3, severity4, severity5, total, confirmed, potential}}
"""
cache_key = f"qualys:vulns:{ip_list}"
if not force_refresh:
cached = _cache.get(cache_key)
if cached is not None:
return cached
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user or not ip_list:
return {}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/api/2.0/fo/asset/host/vm/detection/",
data={
"action": "list",
"ips": str(ip_list),
"severities": "3,4,5",
"status": "New,Active,Re-Opened",
"show_results": "0",
"output_format": "XML",
},
auth=(qualys_user, qualys_pass),
verify=False, timeout=300, proxies=proxies,
headers={"X-Requested-With": "Python"},
)
except Exception:
return {}
if r.status_code != 200:
return {}
txt = r.text
results = {}
for host_block in txt.split("<HOST>")[1:]:
host_block = host_block.split("</HOST>")[0]
host_ip = (parse_xml(host_block, "IP") or [""])[0]
if not host_ip:
continue
counts = {"severity3": 0, "severity4": 0, "severity5": 0,
"total": 0, "confirmed": 0, "potential": 0}
for det_block in host_block.split("<DETECTION>")[1:]:
det_block = det_block.split("</DETECTION>")[0]
severity = (parse_xml(det_block, "SEVERITY") or ["0"])[0]
det_type = (parse_xml(det_block, "TYPE") or [""])[0]
sev = int(severity) if severity.isdigit() else 0
if sev < 3:
continue
if det_type not in ("Confirmed", "Potential"):
continue
counts["total"] += 1
if sev == 3: counts["severity3"] += 1
elif sev == 4: counts["severity4"] += 1
elif sev == 5: counts["severity5"] += 1
if det_type == "Confirmed": counts["confirmed"] += 1
elif det_type == "Potential": counts["potential"] += 1
results[host_ip] = counts
_cache.set(cache_key, results, CACHE_TTL)
return results
def get_activation_keys(db, force_refresh=False):
"""Recupere les activation keys Qualys (cache only par défaut)"""
cache_key = "qualys:actkeys"
cached = _cache.get(cache_key)
if cached is not None:
return cached
if not force_refresh:
# Pas de cache, pas d'appel API au chargement de page
return []
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return []
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/1.0/search/ca/agentactkey",
json={"ServiceRequest": {"preferences": {"limitResults": 20}}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=5, proxies=proxies,
headers={"X-Requested-With": "Python", "Content-Type": "application/json"})
except Exception:
return []
if r.status_code != 200:
return []
keys = []
for block in r.text.split("<AgentActKey>")[1:]:
block = block.split("</AgentActKey>")[0]
keys.append({
"id": (parse_xml(block, "id") or [""])[0],
"key": (parse_xml(block, "activationKey") or [""])[0],
"status": (parse_xml(block, "status") or [""])[0],
"title": (parse_xml(block, "title") or [""])[0],
"used": (parse_xml(block, "countUsed") or ["0"])[0],
"type": (parse_xml(block, "type") or [""])[0],
})
_cache.set(cache_key, keys, 86400) # 1h pour les keys
return keys
def get_agents_summary(db):
"""Resume des agents installes depuis les assets en base"""
rows = db.execute(text("""
SELECT agent_status, COUNT(*) as cnt,
COUNT(*) FILTER (WHERE agent_version IS NOT NULL AND agent_version != '') as with_version
FROM qualys_assets
WHERE agent_status IS NOT NULL AND agent_status != ''
GROUP BY agent_status ORDER BY cnt DESC
""")).fetchall()
versions = db.execute(text("""
SELECT agent_version, COUNT(*) as cnt
FROM qualys_assets
WHERE agent_version IS NOT NULL AND agent_version != ''
GROUP BY agent_version ORDER BY cnt DESC LIMIT 20
""")).fetchall()
total = db.execute(text("SELECT COUNT(*) FROM qualys_assets")).scalar()
active = db.execute(text("SELECT COUNT(*) FROM qualys_assets WHERE agent_status ILIKE '%%active%%' AND agent_status NOT ILIKE '%%inactive%%'")).scalar()
inactive = db.execute(text("SELECT COUNT(*) FROM qualys_assets WHERE agent_status ILIKE '%%inactive%%'")).scalar()
return {"statuses": rows, "versions": versions, "total_assets": total, "active": active, "inactive": inactive}
def invalidate_search_cache():
"""Invalide tout le cache de recherche Qualys"""
_cache.clear_prefix("qualys:search:")
_cache.clear_prefix("qualys:vulns:")
def get_cache_stats():
"""Stats du cache"""
return _cache.stats()
def refresh_all_agents(db, mode="diff"):
"""Rafraichit les agents depuis l'API Qualys QPS.
mode='diff' (defaut) : ne pull que les assets dont lastCheckedIn > dernier sync diff
Court (~30s), pour cron frequent.
mode='full' : pull tous les assets matchant le filtre tag.
Long (5-10 min), pour ménage hebdo.
"""
global _refresh_running
if not _refresh_lock.acquire(blocking=False):
return {"ok": False, "msg": "Une synchronisation Qualys est déjà en cours", "busy": True}
_refresh_running = True
_refresh_cancel.clear()
try:
return _refresh_all_agents_impl(db, mode=mode)
finally:
_refresh_running = False
_refresh_lock.release()
def _refresh_all_agents_impl(db, mode="diff"):
"""Implémentation réelle du refresh (appelée sous verrou).
Optimisations 2026-04-16 (audit perf /qualys/agents):
- Pré-chargement servers en dict Python (O(1) au lieu de 2 queries SQL par asset)
- UPSERT INSERT ... ON CONFLICT DO UPDATE (une seule query au lieu de SELECT+INSERT/UPDATE)
- HTTP Session réutilisée (keep-alive, évite le handshake SSL à chaque page)
- 5 filtres tagName exécutés en parallèle via ThreadPoolExecutor
- Dédup par asset_id (un asset couvert par plusieurs tags n'est traité qu'une fois)
- Commit unique en fin de traitement (au lieu de savepoint + commit par asset)
- max_pages par filtre étendu (30 → 500) pour éviter syncs incomplets silencieux
"""
from .secrets_service import get_secret, set_secret
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
# En mode diff : recupere le timestamp du dernier diff sync
last_diff_iso = None
if mode == "diff":
last_diff_iso = get_secret(db, "qualys_last_diff_sync")
total = db.execute(text("SELECT COUNT(*) FROM qualys_assets")).scalar() or 0
if total > 0 and last_diff_iso:
try:
last_dt = datetime.fromisoformat(last_diff_iso.replace("Z", "+00:00"))
age_min = (datetime.now(timezone.utc) - last_dt).total_seconds() / 60
if age_min < 5:
return {"ok": True, "msg": f"Diff sync deja effectue il y a {int(age_min)} min, rien a faire",
"skipped_all": True, "mode": mode}
except Exception:
pass
# Sauve immediatement le timestamp de DEBUT (couvre annulation + evite de rater
# les updates qui se produisent pendant la sync elle-meme)
if mode == "diff":
try:
start_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
set_secret(db, "qualys_last_diff_sync", start_iso, "Timestamp dernier sync Qualys diff (debut)")
db.commit()
except Exception:
pass
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials Qualys non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
stats = {"created": 0, "updated": 0, "errors": 0, "pages": 0, "skipped": 0}
# flush eventuel pending
try:
db.commit()
except Exception:
pass
# --- OPTIM 1 : pré-chargement des servers en RAM -----------------------
# Remplace 2 queries SQL par asset par un lookup dict O(1).
servers_by_hostname = {}
servers_by_ip = {}
servers_need_fqdn = set() # ids des servers ayant fqdn NULL ou '' (backfill cible)
try:
for row in db.execute(text(
"SELECT id, LOWER(hostname) AS hn, (fqdn IS NULL OR fqdn='') AS needs_fqdn FROM servers"
)).fetchall():
if row.hn:
servers_by_hostname[row.hn] = row.id
if row.needs_fqdn:
servers_need_fqdn.add(row.id)
for row in db.execute(text(
"SELECT s.id, si.ip_address::text AS ip FROM servers s JOIN server_ips si ON si.server_id=s.id"
)).fetchall():
if row.ip:
servers_by_ip[row.ip] = row.id
except Exception:
pass
# --- OPTIM 2 : HTTP Session réutilisée ---------------------------------
# Évite d'ouvrir une nouvelle connexion SSL + auth à chaque page.
sess = requests.Session()
sess.auth = (qualys_user, qualys_pass)
sess.headers.update({"X-Requested-With": "PatchCenter", "Content-Type": "application/json"})
sess.verify = False
if proxies:
sess.proxies.update(proxies)
MAX_PAGES_PER_FILTER = 500 # 500 x 100 = 50 000 assets/filtre (garde-fou large)
def fetch_filter_pages(tag_filter):
"""Récupère tous les blocks XML <HostAsset> pour un filtre tag donné.
Exécuté en thread worker (pas d'accès à `db` → thread-safe)."""
blocks_out = []
local_pages = 0
f_last_id = None
while local_pages < MAX_PAGES_PER_FILTER:
if _refresh_cancel.is_set():
return blocks_out, local_pages
criteria = [{"field": "tagName", "operator": "CONTAINS", "value": tag_filter}]
if f_last_id:
criteria.append({"field": "id", "operator": "GREATER", "value": str(f_last_id)})
if mode == "diff" and last_diff_iso:
criteria.append({"field": "updated", "operator": "GREATER", "value": last_diff_iso})
payload = {"ServiceRequest": {
"preferences": {"limitResults": 100},
"filters": {"Criteria": criteria}
}}
r = None
for attempt in range(3):
try:
r = sess.post(
f"{qualys_url}/qps/rest/5.0/search/am/hostasset",
json=payload, timeout=600)
break
except Exception:
import time as _t
_t.sleep(2 * (attempt + 1))
if r is None or r.status_code != 200 or "SUCCESS" not in r.text:
break
local_pages += 1
page_blocks = r.text.split("<HostAsset>")[1:]
if not page_blocks:
break
new_last_id = f_last_id
for block in page_blocks:
block_end = block.split("</HostAsset>")[0]
blocks_out.append(block_end)
m = re.search(r"<id>(\d+)</id>", block_end)
if m:
aid = int(m.group(1))
if new_last_id is None or aid > new_last_id:
new_last_id = aid
if "<hasMoreRecords>true</hasMoreRecords>" not in r.text or new_last_id == f_last_id:
break
f_last_id = new_last_id
return blocks_out, local_pages
# --- OPTIM 3 : 5 filtres en parallèle ---------------------------------
tag_filters = ["SRV", "server", "SED", "SEI", "EMV"]
all_blocks = []
seen_asset_ids = set()
with ThreadPoolExecutor(max_workers=len(tag_filters)) as executor:
futures = {executor.submit(fetch_filter_pages, tf): tf for tf in tag_filters}
for future in as_completed(futures):
try:
blocks, page_count = future.result()
stats["pages"] += page_count
# Dédup par asset_id (un asset avec tags SRV + server apparait 2 fois)
for b in blocks:
m = re.search(r"<id>(\d+)</id>", b)
if m:
aid = int(m.group(1))
if aid not in seen_asset_ids:
seen_asset_ids.add(aid)
all_blocks.append(b)
except Exception:
stats["errors"] += 1
if _refresh_cancel.is_set():
stats["ok"] = True
stats["cancelled"] = True
stats["msg"] = f"Annulé apres fetch ({stats['pages']} pages, {len(all_blocks)} assets uniques)"
return stats
# --- OPTIM 4 : UPSERT INSERT ... ON CONFLICT DO UPDATE -----------------
# RETURNING (xmax = 0) AS inserted : true = INSERT, false = UPDATE
# (xmax=0 sur une ligne fraichement insérée, != 0 quand update)
UPSERT_SQL = text("""
INSERT INTO qualys_assets
(qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family,
agent_status, agent_version, last_checkin, server_id)
VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid)
ON CONFLICT (qualys_asset_id) DO UPDATE SET
name = EXCLUDED.name,
hostname = EXCLUDED.hostname,
fqdn = EXCLUDED.fqdn,
ip_address = EXCLUDED.ip_address,
os = EXCLUDED.os,
os_family = EXCLUDED.os_family,
agent_status = EXCLUDED.agent_status,
agent_version = EXCLUDED.agent_version,
last_checkin = EXCLUDED.last_checkin,
server_id = EXCLUDED.server_id,
updated_at = now()
RETURNING (xmax = 0) AS inserted
""")
FQDN_UPDATE_SQL = text(
"UPDATE servers SET fqdn=:fqdn WHERE id=:sid AND (fqdn IS NULL OR fqdn='')"
)
def _valid(h):
if not h:
return False
h = h.strip().lower()
if h in ("localhost", ""):
return False
if re.match(r"^\d+\.\d+\.\d+\.\d+$", h):
return False
return True
# Traitement séquentiel (session DB non thread-safe, mais plus de 2 queries/asset)
for block in all_blocks:
if _refresh_cancel.is_set():
break
try:
asset_id = (parse_xml(block, "id") or [""])[0]
if not asset_id or not asset_id.isdigit():
continue
name = (parse_xml(block, "name") or [""])[0]
address = (parse_xml(block, "address") or [""])[0]
fqdn = (parse_xml(block, "fqdn") or [""])[0]
netbios = (parse_xml(block, "netbiosName") or [""])[0]
os_val = (parse_xml(block, "os") or [""])[0]
hostname_src = name if _valid(name.split(".")[0]) else (fqdn if _valid(fqdn.split(".")[0]) else netbios)
hostname = hostname_src.split(".")[0].lower() if hostname_src else ""
agent_status = ""
agent_version = ""
last_checkin = None
if "<agentInfo>" in block:
ab = block[block.find("<agentInfo>"):block.find("</agentInfo>")]
agent_status = (parse_xml(ab, "status") or [""])[0]
agent_version = (parse_xml(ab, "agentVersion") or [""])[0]
lc = (parse_xml(ab, "lastCheckedIn") or [""])[0]
last_checkin = lc if lc else None
os_family = None
if any(k in os_val.lower() for k in ("linux", "red hat", "centos", "debian")):
os_family = "linux"
elif "windows" in os_val.lower():
os_family = "windows"
# Lookup O(1) au lieu de 2 queries SQL
server_id = servers_by_hostname.get(hostname) if hostname else None
if not server_id and address:
server_id = servers_by_ip.get(address)
# FQDN backfill ciblé (seulement si le server en a besoin)
if server_id and fqdn and server_id in servers_need_fqdn:
try:
db.execute(FQDN_UPDATE_SQL, {"fqdn": fqdn, "sid": server_id})
servers_need_fqdn.discard(server_id)
except Exception:
pass
result = db.execute(UPSERT_SQL, {
"qid": int(asset_id), "name": name, "hn": hostname,
"fqdn": fqdn or None, "ip": address or None,
"os": os_val, "osf": os_family, "ast": agent_status,
"av": agent_version, "lc": last_checkin, "sid": server_id,
})
row = result.fetchone()
if row and row.inserted:
stats["created"] += 1
else:
stats["updated"] += 1
except Exception:
stats["errors"] += 1
# Commit final unique
try:
db.commit()
except Exception:
try: db.rollback()
except Exception: pass
stats["ok"] = True
stats["mode"] = mode
stats["msg"] = (f"[{mode}] {stats['created']} créés, {stats['updated']} mis à jour "
f"({stats['pages']} pages // {len(tag_filters)} filtres, "
f"{len(all_blocks)} assets uniques, {stats['errors']} erreurs)")
if mode == "diff":
try:
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
set_secret(db, "qualys_last_diff_sync", now_iso, "Timestamp dernier sync Qualys diff")
db.commit()
except Exception:
pass
return stats
def cancel_refresh():
"""Demande l'annulation du refresh Qualys en cours"""
_refresh_cancel.set()
return {"ok": True, "msg": "Annulation demandée"}
def is_refresh_running():
return _refresh_running
# ===========================================================================
# DASHBOARD VULNERABILITES — agregation par dimension + historique
# Tables: qualys_vuln_snapshot_run (un row par calcul) + qualys_vuln_snapshot
# Dimensions: global, env, pos, os, domain, env_pos
# Niveaux: critical (sev5), high (sev4), medium (sev3), sain, non_scanne
# Un serveur est compte dans son niveau MAX (1 seule colonne)
# ===========================================================================
_dashboard_running = False
def is_dashboard_running():
return _dashboard_running
def _classify_severity(vc):
if not isinstance(vc, dict):
return "sain"
if vc.get("severity5", 0) > 0:
return "critical"
if vc.get("severity4", 0) > 0:
return "high"
if vc.get("severity3", 0) > 0:
return "medium"
return "sain"
def _is_scanned(asset_row, has_vuln_data):
status = (asset_row.agent_status or "").lower()
if status in ("active", "ok", "status_active"):
return True
if has_vuln_data:
return True
return False
def _fetch_asset_ids_by_tag(db, tag_name):
"""Appelle Qualys API pour recuperer la liste d'asset_ids ayant ce tag."""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return set()
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/5.0/search/am/hostasset",
json={"ServiceRequest": {
"preferences": {"limitResults": 1000},
"filters": {"Criteria": [
{"field": "tagName", "operator": "EQUALS", "value": tag_name}
]}
}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=300, proxies=proxies,
headers={"Content-Type": "application/json"}
)
if r.status_code != 200 or "SUCCESS" not in r.text:
print(f"[fetch_tag {tag_name}] HTTP {r.status_code} - {r.text[:200]}")
return set()
ids = set()
for block in r.text.split("<HostAsset>")[1:]:
block = block.split("</HostAsset>")[0]
aid = (parse_xml(block, "id") or [""])[0]
if aid and aid.isdigit():
ids.add(int(aid))
return ids
except Exception as ex:
print(f"[fetch_tag {tag_name}] {type(ex).__name__}: {ex}")
return set()
def compute_vuln_dashboard(db, triggered_by="manual", run_id=None):
"""Calcule un nouveau snapshot. Interroge l'API Qualys par tag pour avoir
les vraies associations asset<->tag (la DB locale qualys_asset_tags peut etre obsolete).
Si run_id fourni, l'utilise (sinon en cree un)."""
import time
from concurrent.futures import ThreadPoolExecutor
t0 = time.time()
try:
# 1. Creer le run en pending si pas deja fourni
if run_id is None:
run_id = db.execute(text("""
INSERT INTO qualys_vuln_snapshot_run (status, triggered_by)
VALUES ('pending', :tb) RETURNING id
"""), {"tb": triggered_by}).scalar()
db.commit()
# 2. Tags a interroger : ENV-*, POS-*, OS-*
tag_rows = db.execute(text("""SELECT name FROM qualys_tags
WHERE name ~ '^(ENV|POS|OS)-' ORDER BY name""")).fetchall()
tag_names = [r.name for r in tag_rows]
# 3. Pour chaque tag, appel API parallele -> {tag_name: set(asset_id)}
def fetch_one(name):
return name, _fetch_asset_ids_by_tag(db, name)
tag_id_map = {}
with ThreadPoolExecutor(max_workers=8) as ex:
for name, ids in ex.map(fetch_one, tag_names):
tag_id_map[name] = ids
# 4. Charger tous les assets locaux (pour total + IPs + agent_status + domain_ltd)
rows = db.execute(text("""
SELECT qa.qualys_asset_id, qa.hostname, qa.ip_address, qa.agent_status,
qa.last_checkin, qa.os_family, qa.server_id, s.domain_ltd
FROM qualys_assets qa
LEFT JOIN servers s ON s.id = qa.server_id
""")).fetchall()
asset_count = len(rows)
# 5. Recuperer vulns par batch de 50 IPs en parallele (cache 10min via get_vuln_counts)
# Une session par thread pour eviter les conflits SQLAlchemy
from app.database import SessionLocal
ip_to_vuln = {}
unique_ips = list({str(r.ip_address) for r in rows
if r.ip_address and str(r.ip_address) != "None"})
batches = [unique_ips[i:i+50] for i in range(0, len(unique_ips), 50)]
def _fetch_vuln_batch(batch):
s = SessionLocal()
try:
return get_vuln_counts(s, ",".join(batch))
except Exception:
return {}
finally:
s.close()
with ThreadPoolExecutor(max_workers=8) as ex:
for vmap in ex.map(_fetch_vuln_batch, batches):
if vmap:
ip_to_vuln.update(vmap)
# 6. Classifier + agreger en utilisant les associations API
agg = {}
def _bump(dim, val, val2, level):
key = (dim, val or "(none)", val2 or "")
if key not in agg:
agg[key] = {"total": 0, "critical": 0, "high": 0, "medium": 0,
"sain": 0, "non_scanne": 0}
agg[key]["total"] += 1
agg[key][level] += 1
for r in rows:
aid = int(r.qualys_asset_id)
ip = str(r.ip_address) if r.ip_address else None
vc = ip_to_vuln.get(ip) if ip else None
scanned = _is_scanned(r, vc is not None)
level = "non_scanne" if not scanned else _classify_severity(vc or {})
envs = sorted(t for t, ids in tag_id_map.items()
if t.startswith("ENV-") and aid in ids)
poses = sorted(t for t, ids in tag_id_map.items()
if t.startswith("POS-") and aid in ids)
oses = sorted(t for t, ids in tag_id_map.items()
if t.startswith("OS-") and aid in ids)
dom = r.domain_ltd or "(sans domaine)"
_bump("global", "all", "", level)
for e in envs or ["(sans env)"]:
_bump("env", e, "", level)
for pos in poses or ["(sans pos)"]:
_bump("pos", pos, "", level)
for o in oses or ["(sans os)"]:
_bump("os", o, "", level)
_bump("domain", dom, "", level)
for e in envs or ["(sans env)"]:
for pos in poses or ["(sans pos)"]:
_bump("env_pos", e, pos, level)
# 7. Persister
for (dim, val, val2), counts in agg.items():
db.execute(text("""
INSERT INTO qualys_vuln_snapshot
(run_id, dimension, dimension_value, dimension_value2,
total, critical, high, medium, sain, non_scanne)
VALUES (:rid, :dim, :v1, :v2, :tot, :c, :h, :m, :s, :ns)
"""), {"rid": run_id, "dim": dim, "v1": val, "v2": val2 or None,
"tot": counts["total"], "c": counts["critical"],
"h": counts["high"], "m": counts["medium"],
"s": counts["sain"], "ns": counts["non_scanne"]})
duration = int(time.time() - t0)
db.execute(text("""UPDATE qualys_vuln_snapshot_run
SET status='ok', asset_count=:ac, duration_sec=:d,
msg=:m WHERE id=:rid"""),
{"ac": asset_count, "d": duration,
"m": f"OK ({len(tag_names)} tags interroges)", "rid": run_id})
db.commit()
return {"ok": True, "msg": "OK", "run_id": run_id,
"asset_count": asset_count, "duration_sec": duration}
except Exception as ex:
db.rollback()
if run_id:
try:
db.execute(text("""UPDATE qualys_vuln_snapshot_run
SET status='error', msg=:m, duration_sec=:d WHERE id=:rid"""),
{"m": str(ex)[:500], "d": int(time.time() - t0), "rid": run_id})
db.commit()
except Exception:
pass
return {"ok": False, "msg": str(ex), "run_id": run_id,
"asset_count": 0, "duration_sec": int(time.time() - t0)}
finally:
pass
def load_vuln_dashboard(db):
"""Charge le dernier run reussi + ses snapshots."""
last_run = db.execute(text("""SELECT id, run_at, asset_count, duration_sec, triggered_by
FROM qualys_vuln_snapshot_run WHERE status='ok' ORDER BY run_at DESC LIMIT 1""")).fetchone()
if not last_run:
return {"global": None, "env": [], "pos": [], "os": [], "domain": [],
"env_pos": [], "last_run": None}
rows = db.execute(text("""SELECT dimension, dimension_value, dimension_value2,
total, critical, high, medium, sain, non_scanne
FROM qualys_vuln_snapshot WHERE run_id=:rid
ORDER BY dimension, dimension_value, dimension_value2"""), {"rid": last_run.id}).fetchall()
out = {"global": None, "env": [], "pos": [], "os": [], "domain": [],
"env_pos": [], "last_run": last_run}
for r in rows:
d = {"name": r.dimension_value, "name2": r.dimension_value2,
"total": r.total, "critical": r.critical, "high": r.high,
"medium": r.medium, "sain": r.sain, "non_scanne": r.non_scanne,
"vuln_total": r.critical + r.high + r.medium,
"scanned": r.total - r.non_scanne}
d["pct_vuln"] = round(100 * d["vuln_total"] / d["scanned"], 1) if d["scanned"] > 0 else 0
if r.dimension == "global":
out["global"] = d
elif r.dimension in out:
out[r.dimension].append(d)
return out
def load_vuln_history(db, period="day", days=30, dimension="global", dimension_value="all"):
"""Retourne l'evolution dans le temps.
period: 'day' (1 point/jour), 'week' (lundi), 'month' (1er du mois)
dimension/dimension_value: filtre (defaut: global/all)
Renvoie list de dicts {bucket, total, critical, high, medium, sain, non_scanne}"""
if period == "month":
bucket_expr = "date_trunc('month', r.run_at)"
elif period == "week":
bucket_expr = "date_trunc('week', r.run_at)"
else:
bucket_expr = "date_trunc('day', r.run_at)"
rows = db.execute(text(f"""
SELECT {bucket_expr} as bucket,
s.total, s.critical, s.high, s.medium, s.sain, s.non_scanne,
r.run_at
FROM qualys_vuln_snapshot s
JOIN qualys_vuln_snapshot_run r ON r.id = s.run_id
WHERE r.status='ok' AND r.run_at >= now() - (:days || ' days')::interval
AND s.dimension = :dim AND s.dimension_value = :dv
ORDER BY r.run_at
"""), {"days": days, "dim": dimension, "dv": dimension_value}).fetchall()
# Garder le dernier snapshot de chaque bucket
by_bucket = {}
for r in rows:
by_bucket[r.bucket] = r
return [{"bucket": b.isoformat(), "total": r.total, "critical": r.critical,
"high": r.high, "medium": r.medium, "sain": r.sain,
"non_scanne": r.non_scanne, "vuln_total": r.critical + r.high + r.medium}
for b, r in sorted(by_bucket.items())]
# ===========================================================================
# DOUBLONS QUALYS — liste + suppression via API
# ===========================================================================
def fetch_all_qualys_assets(db, with_progress=False):
"""Recupere les assets Qualys SERVEURS via API (tags built-in Linux Server + Windows Server).
Pagination via id GREATER, batch 1000. Filtre directement cote API pour eviter de scanner
les ~5000 postes de travail (gain de temps x4).
Retourne liste de dicts {id, name, ip, os, last_check, agent_status}."""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return []
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
def fetch_by_tag(tag_name):
results = []
last_id = 0
while True:
criteria = [{"field": "tagName", "operator": "EQUALS", "value": tag_name}]
if last_id:
criteria.append({"field": "id", "operator": "GREATER", "value": str(last_id)})
body = {"ServiceRequest": {"preferences": {"limitResults": 1000},
"filters": {"Criteria": criteria}}}
try:
r = requests.post(
f"{qualys_url}/qps/rest/5.0/search/am/hostasset",
json=body, auth=(qualys_user, qualys_pass),
verify=False, timeout=180, proxies=proxies,
headers={"Content-Type": "application/json"}
)
except Exception:
break
if r.status_code != 200 or "SUCCESS" not in r.text:
break
batch = []
for block in r.text.split("<HostAsset>")[1:]:
block = block.split("</HostAsset>")[0]
aid = (parse_xml(block, "id") or [""])[0]
name = (parse_xml(block, "name") or [""])[0]
addr = (parse_xml(block, "address") or [""])[0]
os_str = (parse_xml(block, "os") or [""])[0]
last_check = ""
if "<lastCheckedIn>" in block:
last_check = (parse_xml(block, "lastCheckedIn") or [""])[0]
agent_status = ""
if "<agentInfo>" in block:
agent_status = (parse_xml(block, "status") or [""])[0]
if aid and aid.isdigit():
batch.append({"id": int(aid), "name": name, "ip": addr, "os": os_str,
"last_check": last_check, "agent_status": agent_status})
if not batch:
break
results.extend(batch)
last_id = max(b["id"] for b in batch)
if len(batch) < 1000:
break
return results
# Linux Server + Windows Server (tags built-in Qualys, couvre tous les serveurs)
linux = fetch_by_tag("Linux Server")
windows = fetch_by_tag("Windows Server")
# Dedupe par id (un asset pourrait theoriquement avoir les 2 tags)
seen = set()
out = []
for a in linux + windows:
if a["id"] not in seen:
seen.add(a["id"])
out.append(a)
return out
def find_duplicate_hostnames(db, force_refresh=False):
"""Identifie les hostnames en doublon. Cache 10min."""
cache_key = "qualys:duplicates"
if not force_refresh:
cached = _cache.get(cache_key)
if cached is not None:
return cached
from collections import defaultdict
assets = fetch_all_qualys_assets(db)
# Filtrer les workstations (Windows 10/11/7/8/XP, MacOS Desktop) - on veut serveurs uniquement
WKS_PATTERNS = ("Windows 10", "Windows 11", "Windows 7", "Windows 8", "Windows XP",
"Windows Vista", "macOS")
def is_workstation(os_str):
if not os_str:
return False
# Server est explicite, sinon les patterns workstation
if "Server" in os_str:
return False
return any(p in os_str for p in WKS_PATTERNS)
groups = defaultdict(list)
for a in assets:
shortname = a["name"].split(".")[0].lower() if a["name"] else ""
if not shortname:
continue
# Filtrer les hostnames qui sont des IP (ex: "192", "10")
if shortname.replace(".", "").isdigit():
continue
# Filtrer les workstations
if is_workstation(a.get("os", "")):
continue
groups[shortname].append(a)
dupes = []
for shortname, items in groups.items():
if len(items) <= 1:
continue
# Trier par last_check desc (plus recent en premier)
items.sort(key=lambda x: x["last_check"] or "", reverse=True)
dupes.append({
"shortname": shortname,
"count": len(items),
"assets": items,
"newest_check": items[0]["last_check"] or "",
"oldest_check": items[-1]["last_check"] or "",
})
# Trier par count desc puis shortname
dupes.sort(key=lambda x: (-x["count"], x["shortname"]))
result = {"total_assets": len(assets), "duplicate_hostnames": len(dupes),
"total_zombies": sum(d["count"] - 1 for d in dupes), "groups": dupes}
_cache.set(cache_key, result, CACHE_TTL)
return result
def delete_qualys_asset(db, asset_id):
"""Supprime un asset Qualys via API. Retourne {ok, msg}."""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Pas de creds"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/5.0/delete/am/hostasset/{int(asset_id)}",
auth=(qualys_user, qualys_pass), verify=False, timeout=60, proxies=proxies,
headers={"Content-Type": "application/json"}
)
if r.status_code == 200 and "SUCCESS" in r.text:
# Invalider le cache pour forcer un refresh
_cache.delete("qualys:duplicates")
return {"ok": True, "msg": "Supprime"}
return {"ok": False, "msg": f"HTTP {r.status_code}: {r.text[:300]}"}
except Exception as e:
return {"ok": False, "msg": str(e)}