patchcenter/app/services/qualys_service.py

796 lines
33 KiB
Python

"""Service Qualys — sync tags pour un serveur via API + cache memoire"""
import re
import requests
import threading
_refresh_lock = threading.Lock()
_refresh_running = False
_refresh_cancel = threading.Event()
import urllib3
from sqlalchemy import text
from .secrets_service import get_secret
from . import cache as _cache
urllib3.disable_warnings()
CACHE_TTL = 600 # 10 minutes
def _get_qualys_creds(db):
"""Recupere les credentials Qualys depuis les secrets chiffres"""
url = get_secret(db, "qualys_url") or "https://qualysapi.qualys.eu"
user = get_secret(db, "qualys_user") or ""
pwd = get_secret(db, "qualys_pass") or ""
proxy = get_secret(db, "qualys_proxy") or ""
bypass = (get_secret(db, "qualys_bypass_proxy") or "").lower() == "true"
if bypass:
proxy = ""
return url, user, pwd, proxy
def parse_xml(txt, tag):
return re.findall(f"<{tag}>([^<]*)</{tag}>", txt)
def search_assets_api(db, query, field="name", operator="CONTAINS", force_refresh=False):
"""Recherche des assets via l'API Qualys — cache 10 min"""
cache_key = f"qualys:search:{field}:{query}"
if not force_refresh:
cached = _cache.get(cache_key)
if cached is not None:
cached["msg"] = cached.get("msg", "") + " (cache)"
cached["from_cache"] = True
return cached
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials Qualys non configurés", "assets": []}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/search/am/hostasset",
json={"ServiceRequest": {
"preferences": {"limitResults": 200},
"filters": {"Criteria": [
{"field": field, "operator": operator, "value": query}
]}
}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=60, proxies=proxies,
headers={"Content-Type": "application/json"}
)
except Exception as e:
return {"ok": False, "msg": f"Erreur API: {e}", "assets": []}
if r.status_code != 200 or "SUCCESS" not in r.text:
return {"ok": False, "msg": f"API HTTP {r.status_code}", "assets": []}
assets = _parse_assets_full(r.text)
result = {"ok": True, "msg": f"{len(assets)} résultat(s)", "assets": assets, "from_cache": False}
_cache.set(cache_key, result, CACHE_TTL)
return result
def get_all_tags_api(db):
"""Récupère tous les tags depuis l'API Qualys"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés", "tags": []}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/search/am/tag",
json={"ServiceRequest": {"preferences": {"limitResults": 10000}}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=60, proxies=proxies,
headers={"Content-Type": "application/json"}
)
except Exception as e:
return {"ok": False, "msg": str(e), "tags": []}
if r.status_code != 200 or "SUCCESS" not in r.text:
return {"ok": False, "msg": f"HTTP {r.status_code}", "tags": []}
tags = []
for block in r.text.split("<Tag>")[1:]:
block = block.split("</Tag>")[0]
tid = (parse_xml(block, "id") or [""])[0]
tname = (parse_xml(block, "name") or [""])[0]
rule_type = (parse_xml(block, "ruleType") or [""])[0]
if tid and tname:
tags.append({"id": int(tid), "name": tname, "is_dynamic": bool(rule_type), "rule_type": rule_type})
return {"ok": True, "msg": f"{len(tags)} tags", "tags": tags}
def _parse_assets_full(text):
"""Parse le XML Qualys en liste de dicts enrichis"""
assets = []
for block in text.split("<HostAsset>")[1:]:
block = block.split("</HostAsset>")[0]
aid = (parse_xml(block, "id") or [""])[0]
name = (parse_xml(block, "name") or [""])[0]
fqdn = (parse_xml(block, "fqdn") or [""])[0]
address = (parse_xml(block, "address") or [""])[0]
os_val = (parse_xml(block, "os") or [""])[0]
agent_status = ""
agent_version = ""
last_checkin = ""
if "<agentInfo>" in block:
agent_status = (parse_xml(block, "status") or [""])[0]
agent_version = (parse_xml(block, "agentVersion") or [""])[0]
last_checkin = (parse_xml(block, "lastCheckedIn") or [""])[0]
# Tags
tags = []
if "<tags>" in block:
tag_block = block.split("<tags>")[1].split("</tags>")[0]
tag_names = parse_xml(tag_block, "name")
tags = tag_names
netbios = (parse_xml(block, "netbiosName") or [""])[0]
hostname_src = fqdn or netbios or name
hostname = hostname_src.split(".")[0].lower() if hostname_src else ""
assets.append({
"qualys_asset_id": int(aid) if aid else None,
"name": name, "hostname": hostname, "fqdn": fqdn,
"ip_address": address, "os": os_val,
"agent_status": agent_status, "agent_version": agent_version,
"last_checkin": last_checkin, "tags": tags,
"tags_list": ", ".join(tags),
})
return assets
def create_tag_api(db, tag_name):
"""Crée un tag statique dans Qualys via API"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/create/am/tag",
json={"ServiceRequest": {"data": {"Tag": {"name": tag_name}}}},
auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies,
headers={"Content-Type": "application/json"})
if r.status_code == 200 and "SUCCESS" in r.text:
tid = (parse_xml(r.text, "id") or [""])[0]
if tid:
db.execute(text("""
INSERT INTO qualys_tags (qualys_tag_id, name, is_dynamic) VALUES (:tid, :n, false)
ON CONFLICT (qualys_tag_id) DO UPDATE SET name = EXCLUDED.name
"""), {"tid": int(tid), "n": tag_name})
db.commit()
return {"ok": True, "msg": f"Tag '{tag_name}' créé (ID: {tid})"}
return {"ok": False, "msg": f"Erreur API: {r.text[:200]}"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def delete_tag_api(db, qualys_tag_id):
"""Supprime un tag dans Qualys via API"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/delete/am/tag/{qualys_tag_id}",
auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies,
headers={"Content-Type": "application/json"})
if r.status_code == 200 and "SUCCESS" in r.text:
db.execute(text("DELETE FROM qualys_asset_tags WHERE qualys_tag_id = :tid"), {"tid": qualys_tag_id})
db.execute(text("DELETE FROM qualys_tags WHERE qualys_tag_id = :tid"), {"tid": qualys_tag_id})
db.commit()
return {"ok": True, "msg": "Tag supprimé"}
return {"ok": False, "msg": f"Erreur API: {r.text[:200]}"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def add_tag_to_asset_api(db, asset_id, tag_id):
"""Ajoute un tag à un asset via API Qualys"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/update/am/hostasset/{asset_id}",
json={"ServiceRequest": {"data": {"HostAsset": {"tags": {"add": {"TagSimple": {"id": tag_id}}}}}}},
auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies,
headers={"Content-Type": "application/json"})
if r.status_code == 200 and "SUCCESS" in r.text:
db.execute(text("""
INSERT INTO qualys_asset_tags (qualys_asset_id, qualys_tag_id)
VALUES (:aid, :tid) ON CONFLICT DO NOTHING
"""), {"aid": asset_id, "tid": tag_id})
db.commit()
return {"ok": True, "msg": "Tag ajouté"}
return {"ok": False, "msg": f"Erreur: {r.text[:200]}"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def remove_tag_from_asset_api(db, asset_id, tag_id):
"""Retire un tag d'un asset via API Qualys"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/update/am/hostasset/{asset_id}",
json={"ServiceRequest": {"data": {"HostAsset": {"tags": {"remove": {"TagSimple": {"id": tag_id}}}}}}},
auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies,
headers={"Content-Type": "application/json"})
if r.status_code == 200 and "SUCCESS" in r.text:
db.execute(text("""
DELETE FROM qualys_asset_tags WHERE qualys_asset_id = :aid AND qualys_tag_id = :tid
"""), {"aid": asset_id, "tid": tag_id})
db.commit()
return {"ok": True, "msg": "Tag retiré"}
return {"ok": False, "msg": f"Erreur: {r.text[:200]}"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def resync_all_tags(db):
"""Resync tous les tags depuis l'API Qualys vers la base locale"""
result = get_all_tags_api(db)
if not result["ok"]:
return result
count = 0
for t in result["tags"]:
db.execute(text("""
INSERT INTO qualys_tags (qualys_tag_id, name, is_dynamic, rule_type)
VALUES (:tid, :n, :dyn, :rt)
ON CONFLICT (qualys_tag_id) DO UPDATE SET name = EXCLUDED.name, is_dynamic = EXCLUDED.is_dynamic,
rule_type = EXCLUDED.rule_type, updated_at = now()
"""), {"tid": t["id"], "n": t["name"], "dyn": t["is_dynamic"], "rt": t.get("rule_type")})
count += 1
db.commit()
return {"ok": True, "msg": f"{count} tags synchronisés"}
def sync_server_qualys(db, server_id):
"""Sync les tags Qualys pour un serveur donne. Retourne un dict resultat."""
row = db.execute(text(
"SELECT hostname, qualys_asset_id FROM servers WHERE id = :id"
), {"id": server_id}).fetchone()
if not row:
return {"ok": False, "msg": "Serveur introuvable"}
hostname = row.hostname
qid = row.qualys_asset_id
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials Qualys non configures (Settings)"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
# Chercher l'asset par hostname si pas de qualys_asset_id
if not qid:
qid = _find_asset_by_hostname(qualys_url, qualys_user, qualys_pass, hostname, proxies)
if not qid:
return {"ok": False, "msg": f"Asset '{hostname}' non trouve dans Qualys"}
db.execute(text("UPDATE servers SET qualys_asset_id = :qid WHERE id = :id"),
{"qid": qid, "id": server_id})
# Recuperer l'asset complet avec tags
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/search/am/hostasset",
json={"ServiceRequest": {
"filters": {"Criteria": [
{"field": "id", "operator": "EQUALS", "value": str(qid)}
]}
}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=60, proxies=proxies,
headers={"Content-Type": "application/json"}
)
except Exception as e:
return {"ok": False, "msg": f"Erreur API: {e}"}
if r.status_code != 200 or "SUCCESS" not in r.text:
return {"ok": False, "msg": f"API HTTP {r.status_code}"}
# Parser asset
blocks = r.text.split("<HostAsset>")
if len(blocks) < 2:
return {"ok": False, "msg": "Asset non trouve dans la reponse"}
block = blocks[1].split("</HostAsset>")[0]
fqdn = (parse_xml(block, "fqdn") or [""])[0]
address = (parse_xml(block, "address") or [""])[0]
os_val = (parse_xml(block, "os") or [""])[0]
agent_status = (parse_xml(block, "status") or [""])[0] if "<agentInfo>" in block else ""
agent_version = (parse_xml(block, "agentVersion") or [""])[0]
last_checkin = (parse_xml(block, "lastCheckedIn") or [""])[0] or None
os_family = None
os_low = os_val.lower()
if any(k in os_low for k in ("linux", "red hat", "centos", "debian")):
os_family = "linux"
elif "windows" in os_low:
os_family = "windows"
# Update qualys_assets
db.execute(text("""
INSERT INTO qualys_assets (qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family,
agent_status, agent_version, last_checkin, server_id)
VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid)
ON CONFLICT (qualys_asset_id) DO UPDATE SET
fqdn=EXCLUDED.fqdn, ip_address=EXCLUDED.ip_address, os=EXCLUDED.os,
os_family=EXCLUDED.os_family, agent_status=EXCLUDED.agent_status,
agent_version=EXCLUDED.agent_version, last_checkin=EXCLUDED.last_checkin, updated_at=now()
"""), {"qid": qid, "name": hostname, "hn": hostname.split(".")[0].lower(),
"fqdn": fqdn or None, "ip": address or None, "os": os_val, "osf": os_family,
"ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id})
# Enrichir servers
db.execute(text("""
UPDATE servers SET
fqdn = COALESCE(NULLIF(:fqdn, ''), fqdn),
os_version = COALESCE(NULLIF(:os, ''), os_version)
WHERE id = :id
"""), {"fqdn": fqdn, "os": os_val, "id": server_id})
# Tags
tag_count = 0
if "<tags>" in block:
tag_block = block.split("<tags>")[1].split("</tags>")[0]
tag_ids = parse_xml(tag_block, "id")
tag_names = parse_xml(tag_block, "name")
# Supprimer anciens liens
db.execute(text("DELETE FROM qualys_asset_tags WHERE qualys_asset_id = :qid"), {"qid": qid})
for tid, tname in zip(tag_ids, tag_names):
# Upsert tag
db.execute(text("""
INSERT INTO qualys_tags (qualys_tag_id, name) VALUES (:tid, :tn)
ON CONFLICT (qualys_tag_id) DO UPDATE SET name=EXCLUDED.name, updated_at=now()
"""), {"tid": int(tid), "tn": tname})
# Lien asset-tag
db.execute(text("""
INSERT INTO qualys_asset_tags (qualys_asset_id, qualys_tag_id)
VALUES (:qid, :tid) ON CONFLICT DO NOTHING
"""), {"qid": qid, "tid": int(tid)})
tag_count += 1
db.commit()
return {"ok": True, "msg": f"Synchro OK — {tag_count} tags", "tags": tag_count}
def _find_asset_by_hostname(qualys_url, qualys_user, qualys_pass, hostname, proxies=None):
"""Cherche un asset Qualys par hostname"""
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/search/am/hostasset",
json={"ServiceRequest": {
"preferences": {"limitResults": 5},
"filters": {"Criteria": [
{"field": "name", "operator": "CONTAINS", "value": hostname}
]}
}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=60, proxies=proxies,
headers={"Content-Type": "application/json"}
)
if r.status_code == 200 and "SUCCESS" in r.text:
ids = parse_xml(r.text, "id")
if ids:
return int(ids[0])
except Exception:
pass
return None
def get_vuln_counts(db, ip_list, force_refresh=False):
"""Recupere le nombre de vulnerabilites actives severity 3,4,5 pour une liste d'IPs.
ip_list: str (IPs separees par virgules)
Retourne dict {ip: {severity3, severity4, severity5, total, confirmed, potential}}
"""
cache_key = f"qualys:vulns:{ip_list}"
if not force_refresh:
cached = _cache.get(cache_key)
if cached is not None:
return cached
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user or not ip_list:
return {}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/api/2.0/fo/asset/host/vm/detection/",
data={
"action": "list",
"ips": str(ip_list),
"severities": "3,4,5",
"status": "New,Active,Re-Opened",
"show_results": "0",
"output_format": "XML",
},
auth=(qualys_user, qualys_pass),
verify=False, timeout=300, proxies=proxies,
headers={"X-Requested-With": "Python"},
)
except Exception:
return {}
if r.status_code != 200:
return {}
txt = r.text
results = {}
for host_block in txt.split("<HOST>")[1:]:
host_block = host_block.split("</HOST>")[0]
host_ip = (parse_xml(host_block, "IP") or [""])[0]
if not host_ip:
continue
counts = {"severity3": 0, "severity4": 0, "severity5": 0,
"total": 0, "confirmed": 0, "potential": 0}
for det_block in host_block.split("<DETECTION>")[1:]:
det_block = det_block.split("</DETECTION>")[0]
severity = (parse_xml(det_block, "SEVERITY") or ["0"])[0]
det_type = (parse_xml(det_block, "TYPE") or [""])[0]
sev = int(severity) if severity.isdigit() else 0
if sev < 3:
continue
if det_type not in ("Confirmed", "Potential"):
continue
counts["total"] += 1
if sev == 3: counts["severity3"] += 1
elif sev == 4: counts["severity4"] += 1
elif sev == 5: counts["severity5"] += 1
if det_type == "Confirmed": counts["confirmed"] += 1
elif det_type == "Potential": counts["potential"] += 1
results[host_ip] = counts
_cache.set(cache_key, results, CACHE_TTL)
return results
def get_activation_keys(db, force_refresh=False):
"""Recupere les activation keys Qualys (cache only par défaut)"""
cache_key = "qualys:actkeys"
cached = _cache.get(cache_key)
if cached is not None:
return cached
if not force_refresh:
# Pas de cache, pas d'appel API au chargement de page
return []
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return []
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/1.0/search/ca/agentactkey",
json={"ServiceRequest": {"preferences": {"limitResults": 20}}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=5, proxies=proxies,
headers={"X-Requested-With": "Python", "Content-Type": "application/json"})
except Exception:
return []
if r.status_code != 200:
return []
keys = []
for block in r.text.split("<AgentActKey>")[1:]:
block = block.split("</AgentActKey>")[0]
keys.append({
"id": (parse_xml(block, "id") or [""])[0],
"key": (parse_xml(block, "activationKey") or [""])[0],
"status": (parse_xml(block, "status") or [""])[0],
"title": (parse_xml(block, "title") or [""])[0],
"used": (parse_xml(block, "countUsed") or ["0"])[0],
"type": (parse_xml(block, "type") or [""])[0],
})
_cache.set(cache_key, keys, 86400) # 1h pour les keys
return keys
def get_agents_summary(db):
"""Resume des agents installes depuis les assets en base"""
rows = db.execute(text("""
SELECT agent_status, COUNT(*) as cnt,
COUNT(*) FILTER (WHERE agent_version IS NOT NULL AND agent_version != '') as with_version
FROM qualys_assets
WHERE agent_status IS NOT NULL AND agent_status != ''
GROUP BY agent_status ORDER BY cnt DESC
""")).fetchall()
versions = db.execute(text("""
SELECT agent_version, COUNT(*) as cnt
FROM qualys_assets
WHERE agent_version IS NOT NULL AND agent_version != ''
GROUP BY agent_version ORDER BY cnt DESC LIMIT 20
""")).fetchall()
total = db.execute(text("SELECT COUNT(*) FROM qualys_assets")).scalar()
active = db.execute(text("SELECT COUNT(*) FROM qualys_assets WHERE agent_status ILIKE '%%active%%' AND agent_status NOT ILIKE '%%inactive%%'")).scalar()
inactive = db.execute(text("SELECT COUNT(*) FROM qualys_assets WHERE agent_status ILIKE '%%inactive%%'")).scalar()
return {"statuses": rows, "versions": versions, "total_assets": total, "active": active, "inactive": inactive}
def invalidate_search_cache():
"""Invalide tout le cache de recherche Qualys"""
_cache.clear_prefix("qualys:search:")
_cache.clear_prefix("qualys:vulns:")
def get_cache_stats():
"""Stats du cache"""
return _cache.stats()
def refresh_all_agents(db, mode="diff"):
"""Rafraichit les agents depuis l'API Qualys QPS.
mode='diff' (defaut) : ne pull que les assets dont lastCheckedIn > dernier sync diff
Court (~30s), pour cron frequent.
mode='full' : pull tous les assets matchant le filtre tag.
Long (5-10 min), pour ménage hebdo.
"""
global _refresh_running
if not _refresh_lock.acquire(blocking=False):
return {"ok": False, "msg": "Une synchronisation Qualys est déjà en cours", "busy": True}
_refresh_running = True
_refresh_cancel.clear()
try:
return _refresh_all_agents_impl(db, mode=mode)
finally:
_refresh_running = False
_refresh_lock.release()
def _refresh_all_agents_impl(db, mode="diff"):
"""Implémentation réelle du refresh (appelée sous verrou)"""
from .secrets_service import get_secret, set_secret
from datetime import datetime, timezone
# En mode diff : recupere le timestamp du dernier diff sync
last_diff_iso = None
if mode == "diff":
last_diff_iso = get_secret(db, "qualys_last_diff_sync")
# Early exit seulement en diff : si tous recents ET dernier diff < 30 min
total = db.execute(text("SELECT COUNT(*) FROM qualys_assets")).scalar() or 0
if total > 0 and last_diff_iso:
try:
last_dt = datetime.fromisoformat(last_diff_iso.replace("Z", "+00:00"))
age_min = (datetime.now(timezone.utc) - last_dt).total_seconds() / 60
if age_min < 5:
return {"ok": True, "msg": f"Diff sync deja effectue il y a {int(age_min)} min, rien a faire",
"skipped_all": True, "mode": mode}
except Exception:
pass
# Sauve immediatement le timestamp de DEBUT (couvre annulation en cours)
if mode == "diff":
try:
start_iso = datetime.now(timezone.utc).isoformat()
set_secret(db, "qualys_last_diff_sync", start_iso, "Timestamp dernier sync Qualys diff (debut)")
db.commit()
except Exception:
pass
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials Qualys non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
stats = {"created": 0, "updated": 0, "errors": 0, "pages": 0}
last_id = None
max_pages = 30 # garde-fou par filtre (multi-pass = max_pages * nb_filtres)
# Autocommit sur la session courante
try:
db.commit() # flush any pending
except Exception:
pass
# Multi-pass : plusieurs filtres tagName CONTAINS pour couvrir la
# nomenclature heterogene (SRV, server, SED, SEI, EMV...).
# Doublons gere par ON CONFLICT/UPDATE sur qualys_asset_id.
tag_filters = ["SRV", "server", "SED", "SEI", "EMV"]
current_filter_idx = 0
last_id = None
while current_filter_idx < len(tag_filters):
tag_filter = tag_filters[current_filter_idx]
if _refresh_cancel.is_set():
stats["ok"] = True
stats["cancelled"] = True
stats["msg"] = f"Annulé apres {stats['pages']} pages (filtre={tag_filter}): {stats['created']} créés, {stats['updated']} mis à jour"
return stats
stats["pages"] += 1
criteria = [{"field": "tagName", "operator": "CONTAINS", "value": tag_filter}]
if last_id:
criteria.append({"field": "id", "operator": "GREATER", "value": str(last_id)})
# Mode diff : filtre 'updated' > dernier sync (champ searchable Qualys QPS)
if mode == "diff" and last_diff_iso:
criteria.append({"field": "updated", "operator": "GREATER", "value": last_diff_iso})
payload = {"ServiceRequest": {
"preferences": {"limitResults": 100},
"filters": {"Criteria": criteria}
}}
# Retry sur erreurs transitoires proxy/reseau (max 3 essais)
r = None
last_err = None
for attempt in range(3):
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/search/am/hostasset",
json=payload, auth=(qualys_user, qualys_pass),
verify=False, timeout=600, proxies=proxies,
headers={"X-Requested-With": "PatchCenter", "Content-Type": "application/json"})
break
except Exception as e:
last_err = e
import time as _t
_t.sleep(2 * (attempt + 1)) # backoff 2s, 4s
if r is None:
return {"ok": False, "msg": f"page {stats['pages']} apres 3 essais: {last_err}", **stats}
if r.status_code != 200 or "SUCCESS" not in r.text:
return {"ok": False, "msg": f"HTTP {r.status_code} page {stats['pages']}", **stats}
blocks = r.text.split("<HostAsset>")[1:]
if not blocks:
break
new_last_id = last_id
for block in blocks:
block = block.split("</HostAsset>")[0]
try:
asset_id = (parse_xml(block, "id") or [""])[0]
if asset_id and asset_id.isdigit():
aid_int = int(asset_id)
if new_last_id is None or aid_int > new_last_id:
new_last_id = aid_int
name = (parse_xml(block, "name") or [""])[0]
address = (parse_xml(block, "address") or [""])[0]
fqdn = (parse_xml(block, "fqdn") or [""])[0]
netbios = (parse_xml(block, "netbiosName") or [""])[0]
os_val = (parse_xml(block, "os") or [""])[0]
# Priorite name (Qualys display) sauf si c'est une IP ou vide -> FQDN -> NetBIOS
import re as _re
def _valid(h):
if not h:
return False
h = h.strip().lower()
if h in ("localhost", ""):
return False
if _re.match(r"^\d+\.\d+\.\d+\.\d+$", h):
return False
return True
hostname_src = name if _valid(name.split(".")[0]) else (fqdn if _valid(fqdn.split(".")[0]) else netbios)
hostname = hostname_src.split(".")[0].lower() if hostname_src else ""
agent_status = ""
agent_version = ""
last_checkin = None
if "<agentInfo>" in block:
ab = block[block.find("<agentInfo>"):block.find("</agentInfo>")]
agent_status = (parse_xml(ab, "status") or [""])[0]
agent_version = (parse_xml(ab, "agentVersion") or [""])[0]
lc = (parse_xml(ab, "lastCheckedIn") or [""])[0]
last_checkin = lc if lc else None
os_family = None
if any(k in os_val.lower() for k in ("linux", "red hat", "centos", "debian")):
os_family = "linux"
elif "windows" in os_val.lower():
os_family = "windows"
# Savepoint pour isoler les erreurs
try:
sp = db.begin_nested()
srv = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname)=LOWER(:h)"),
{"h": hostname}).fetchone()
server_id = srv.id if srv else None
# Fallback : matching par IP si hostname ne match pas
if not server_id and address:
ip_match = db.execute(text(
"SELECT s.id FROM servers s JOIN server_ips si ON si.server_id=s.id "
"WHERE si.ip_address = CAST(:ip AS inet) LIMIT 1"
), {"ip": address}).fetchone()
if ip_match:
server_id = ip_match.id
if server_id and fqdn:
db.execute(text("UPDATE servers SET fqdn=:fqdn WHERE id=:sid AND (fqdn IS NULL OR fqdn='')"),
{"fqdn": fqdn, "sid": server_id})
existing = db.execute(text("SELECT id, updated_at FROM qualys_assets WHERE qualys_asset_id=:qid"),
{"qid": int(asset_id)}).fetchone()
# Skip si déjà mis à jour dans les 5 dernières minutes
if existing and existing.updated_at:
from datetime import datetime, timezone, timedelta
try:
age = (datetime.now(timezone.utc) - existing.updated_at).total_seconds()
if age < 2400:
stats["skipped"] = stats.get("skipped", 0) + 1
sp.commit()
continue
except Exception:
pass
if existing:
db.execute(text("""UPDATE qualys_assets SET
name=:name, hostname=:hn, fqdn=:fqdn, ip_address=:ip, os=:os, os_family=:osf,
agent_status=:ast, agent_version=:av, last_checkin=:lc, server_id=:sid, updated_at=now()
WHERE qualys_asset_id=:qid"""),
{"qid": int(asset_id), "name": name, "hn": hostname, "fqdn": fqdn or None,
"ip": address or None, "os": os_val, "osf": os_family,
"ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id})
stats["updated"] += 1
else:
db.execute(text("""INSERT INTO qualys_assets
(qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family,
agent_status, agent_version, last_checkin, server_id)
VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid)"""),
{"qid": int(asset_id), "name": name, "hn": hostname, "fqdn": fqdn or None,
"ip": address or None, "os": os_val, "osf": os_family,
"ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id})
stats["created"] += 1
sp.commit()
except Exception:
try: sp.rollback()
except Exception: pass
stats["errors"] += 1
except Exception:
stats["errors"] += 1
db.commit()
# Filtre actuel: page suivante OU passe au prochain filtre
if "<hasMoreRecords>true</hasMoreRecords>" not in r.text or new_last_id == last_id:
current_filter_idx += 1
last_id = None
else:
last_id = new_last_id
stats["ok"] = True
stats["mode"] = mode
stats["msg"] = f"[{mode}] {stats['created']} créés, {stats['updated']} mis à jour ({stats['pages']} pages, {stats['errors']} erreurs, {len(tag_filters)} filtres)"
# Memorise le timestamp pour le prochain diff sync
if mode == "diff":
try:
now_iso = datetime.now(timezone.utc).isoformat()
set_secret(db, "qualys_last_diff_sync", now_iso, "Timestamp dernier sync Qualys diff")
db.commit()
except Exception:
pass
return stats
def cancel_refresh():
"""Demande l'annulation du refresh Qualys en cours"""
_refresh_cancel.set()
return {"ok": True, "msg": "Annulation demandée"}
def is_refresh_running():
return _refresh_running