patchcenter/app/services/qualys_service.py
Khalid MOUTAOUAKIL c139dfbaa2 Cache mémoire 10min pour Qualys API, bouton Resync temps réel, page Agents (activation keys + versions)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 23:04:48 +02:00

531 lines
21 KiB
Python

"""Service Qualys — sync tags pour un serveur via API + cache memoire"""
import re
import requests
import urllib3
from sqlalchemy import text
from .secrets_service import get_secret
from . import cache as _cache
urllib3.disable_warnings()
CACHE_TTL = 600 # 10 minutes
def _get_qualys_creds(db):
"""Recupere les credentials Qualys depuis les secrets chiffres"""
url = get_secret(db, "qualys_url") or "https://qualysapi.qualys.eu"
user = get_secret(db, "qualys_user") or ""
pwd = get_secret(db, "qualys_pass") or ""
proxy = get_secret(db, "qualys_proxy") or ""
bypass = (get_secret(db, "qualys_bypass_proxy") or "").lower() == "true"
if bypass:
proxy = ""
return url, user, pwd, proxy
def parse_xml(txt, tag):
return re.findall(f"<{tag}>([^<]*)</{tag}>", txt)
def search_assets_api(db, query, field="name", operator="CONTAINS", force_refresh=False):
"""Recherche des assets via l'API Qualys — cache 10 min"""
cache_key = f"qualys:search:{field}:{query}"
if not force_refresh:
cached = _cache.get(cache_key)
if cached is not None:
cached["msg"] = cached.get("msg", "") + " (cache)"
cached["from_cache"] = True
return cached
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials Qualys non configurés", "assets": []}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/search/am/hostasset",
json={"ServiceRequest": {
"preferences": {"limitResults": 200},
"filters": {"Criteria": [
{"field": field, "operator": operator, "value": query}
]}
}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=60, proxies=proxies,
headers={"Content-Type": "application/json"}
)
except Exception as e:
return {"ok": False, "msg": f"Erreur API: {e}", "assets": []}
if r.status_code != 200 or "SUCCESS" not in r.text:
return {"ok": False, "msg": f"API HTTP {r.status_code}", "assets": []}
assets = _parse_assets_full(r.text)
result = {"ok": True, "msg": f"{len(assets)} résultat(s)", "assets": assets, "from_cache": False}
_cache.set(cache_key, result, CACHE_TTL)
return result
def get_all_tags_api(db):
"""Récupère tous les tags depuis l'API Qualys"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés", "tags": []}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/search/am/tag",
json={"ServiceRequest": {"preferences": {"limitResults": 1000}}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=60, proxies=proxies,
headers={"Content-Type": "application/json"}
)
except Exception as e:
return {"ok": False, "msg": str(e), "tags": []}
if r.status_code != 200 or "SUCCESS" not in r.text:
return {"ok": False, "msg": f"HTTP {r.status_code}", "tags": []}
tags = []
for block in r.text.split("<Tag>")[1:]:
block = block.split("</Tag>")[0]
tid = (parse_xml(block, "id") or [""])[0]
tname = (parse_xml(block, "name") or [""])[0]
rule_type = (parse_xml(block, "ruleType") or [""])[0]
if tid and tname:
tags.append({"id": int(tid), "name": tname, "is_dynamic": bool(rule_type), "rule_type": rule_type})
return {"ok": True, "msg": f"{len(tags)} tags", "tags": tags}
def _parse_assets_full(text):
"""Parse le XML Qualys en liste de dicts enrichis"""
assets = []
for block in text.split("<HostAsset>")[1:]:
block = block.split("</HostAsset>")[0]
aid = (parse_xml(block, "id") or [""])[0]
name = (parse_xml(block, "name") or [""])[0]
fqdn = (parse_xml(block, "fqdn") or [""])[0]
address = (parse_xml(block, "address") or [""])[0]
os_val = (parse_xml(block, "os") or [""])[0]
agent_status = ""
agent_version = ""
last_checkin = ""
if "<agentInfo>" in block:
agent_status = (parse_xml(block, "status") or [""])[0]
agent_version = (parse_xml(block, "agentVersion") or [""])[0]
last_checkin = (parse_xml(block, "lastCheckedIn") or [""])[0]
# Tags
tags = []
if "<tags>" in block:
tag_block = block.split("<tags>")[1].split("</tags>")[0]
tag_names = parse_xml(tag_block, "name")
tags = tag_names
hostname = name.split(".")[0].lower() if name else ""
assets.append({
"qualys_asset_id": int(aid) if aid else None,
"name": name, "hostname": hostname, "fqdn": fqdn,
"ip_address": address, "os": os_val,
"agent_status": agent_status, "agent_version": agent_version,
"last_checkin": last_checkin, "tags": tags,
"tags_list": ", ".join(tags),
})
return assets
def create_tag_api(db, tag_name):
"""Crée un tag statique dans Qualys via API"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/create/am/tag",
json={"ServiceRequest": {"data": {"Tag": {"name": tag_name}}}},
auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies,
headers={"Content-Type": "application/json"})
if r.status_code == 200 and "SUCCESS" in r.text:
tid = (parse_xml(r.text, "id") or [""])[0]
if tid:
db.execute(text("""
INSERT INTO qualys_tags (qualys_tag_id, name, is_dynamic) VALUES (:tid, :n, false)
ON CONFLICT (qualys_tag_id) DO UPDATE SET name = EXCLUDED.name
"""), {"tid": int(tid), "n": tag_name})
db.commit()
return {"ok": True, "msg": f"Tag '{tag_name}' créé (ID: {tid})"}
return {"ok": False, "msg": f"Erreur API: {r.text[:200]}"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def delete_tag_api(db, qualys_tag_id):
"""Supprime un tag dans Qualys via API"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/delete/am/tag/{qualys_tag_id}",
auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies,
headers={"Content-Type": "application/json"})
if r.status_code == 200 and "SUCCESS" in r.text:
db.execute(text("DELETE FROM qualys_asset_tags WHERE qualys_tag_id = :tid"), {"tid": qualys_tag_id})
db.execute(text("DELETE FROM qualys_tags WHERE qualys_tag_id = :tid"), {"tid": qualys_tag_id})
db.commit()
return {"ok": True, "msg": "Tag supprimé"}
return {"ok": False, "msg": f"Erreur API: {r.text[:200]}"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def add_tag_to_asset_api(db, asset_id, tag_id):
"""Ajoute un tag à un asset via API Qualys"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/update/am/hostasset/{asset_id}",
json={"ServiceRequest": {"data": {"HostAsset": {"tags": {"add": {"TagSimple": {"id": tag_id}}}}}}},
auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies,
headers={"Content-Type": "application/json"})
if r.status_code == 200 and "SUCCESS" in r.text:
db.execute(text("""
INSERT INTO qualys_asset_tags (qualys_asset_id, qualys_tag_id)
VALUES (:aid, :tid) ON CONFLICT DO NOTHING
"""), {"aid": asset_id, "tid": tag_id})
db.commit()
return {"ok": True, "msg": "Tag ajouté"}
return {"ok": False, "msg": f"Erreur: {r.text[:200]}"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def remove_tag_from_asset_api(db, asset_id, tag_id):
"""Retire un tag d'un asset via API Qualys"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/update/am/hostasset/{asset_id}",
json={"ServiceRequest": {"data": {"HostAsset": {"tags": {"remove": {"TagSimple": {"id": tag_id}}}}}}},
auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies,
headers={"Content-Type": "application/json"})
if r.status_code == 200 and "SUCCESS" in r.text:
db.execute(text("""
DELETE FROM qualys_asset_tags WHERE qualys_asset_id = :aid AND qualys_tag_id = :tid
"""), {"aid": asset_id, "tid": tag_id})
db.commit()
return {"ok": True, "msg": "Tag retiré"}
return {"ok": False, "msg": f"Erreur: {r.text[:200]}"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def resync_all_tags(db):
"""Resync tous les tags depuis l'API Qualys vers la base locale"""
result = get_all_tags_api(db)
if not result["ok"]:
return result
count = 0
for t in result["tags"]:
db.execute(text("""
INSERT INTO qualys_tags (qualys_tag_id, name, is_dynamic, rule_type)
VALUES (:tid, :n, :dyn, :rt)
ON CONFLICT (qualys_tag_id) DO UPDATE SET name = EXCLUDED.name, is_dynamic = EXCLUDED.is_dynamic,
rule_type = EXCLUDED.rule_type, updated_at = now()
"""), {"tid": t["id"], "n": t["name"], "dyn": t["is_dynamic"], "rt": t.get("rule_type")})
count += 1
db.commit()
return {"ok": True, "msg": f"{count} tags synchronisés"}
def sync_server_qualys(db, server_id):
"""Sync les tags Qualys pour un serveur donne. Retourne un dict resultat."""
row = db.execute(text(
"SELECT hostname, qualys_asset_id FROM servers WHERE id = :id"
), {"id": server_id}).fetchone()
if not row:
return {"ok": False, "msg": "Serveur introuvable"}
hostname = row.hostname
qid = row.qualys_asset_id
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials Qualys non configures (Settings)"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
# Chercher l'asset par hostname si pas de qualys_asset_id
if not qid:
qid = _find_asset_by_hostname(qualys_url, qualys_user, qualys_pass, hostname, proxies)
if not qid:
return {"ok": False, "msg": f"Asset '{hostname}' non trouve dans Qualys"}
db.execute(text("UPDATE servers SET qualys_asset_id = :qid WHERE id = :id"),
{"qid": qid, "id": server_id})
# Recuperer l'asset complet avec tags
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/search/am/hostasset",
json={"ServiceRequest": {
"filters": {"Criteria": [
{"field": "id", "operator": "EQUALS", "value": str(qid)}
]}
}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=60, proxies=proxies,
headers={"Content-Type": "application/json"}
)
except Exception as e:
return {"ok": False, "msg": f"Erreur API: {e}"}
if r.status_code != 200 or "SUCCESS" not in r.text:
return {"ok": False, "msg": f"API HTTP {r.status_code}"}
# Parser asset
blocks = r.text.split("<HostAsset>")
if len(blocks) < 2:
return {"ok": False, "msg": "Asset non trouve dans la reponse"}
block = blocks[1].split("</HostAsset>")[0]
fqdn = (parse_xml(block, "fqdn") or [""])[0]
address = (parse_xml(block, "address") or [""])[0]
os_val = (parse_xml(block, "os") or [""])[0]
agent_status = (parse_xml(block, "status") or [""])[0] if "<agentInfo>" in block else ""
agent_version = (parse_xml(block, "agentVersion") or [""])[0]
last_checkin = (parse_xml(block, "lastCheckedIn") or [""])[0] or None
os_family = None
os_low = os_val.lower()
if any(k in os_low for k in ("linux", "red hat", "centos", "debian")):
os_family = "linux"
elif "windows" in os_low:
os_family = "windows"
# Update qualys_assets
db.execute(text("""
INSERT INTO qualys_assets (qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family,
agent_status, agent_version, last_checkin, server_id)
VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid)
ON CONFLICT (qualys_asset_id) DO UPDATE SET
fqdn=EXCLUDED.fqdn, ip_address=EXCLUDED.ip_address, os=EXCLUDED.os,
os_family=EXCLUDED.os_family, agent_status=EXCLUDED.agent_status,
agent_version=EXCLUDED.agent_version, last_checkin=EXCLUDED.last_checkin, updated_at=now()
"""), {"qid": qid, "name": hostname, "hn": hostname.split(".")[0].lower(),
"fqdn": fqdn or None, "ip": address or None, "os": os_val, "osf": os_family,
"ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id})
# Enrichir servers
db.execute(text("""
UPDATE servers SET
fqdn = COALESCE(NULLIF(:fqdn, ''), fqdn),
os_version = COALESCE(NULLIF(:os, ''), os_version)
WHERE id = :id
"""), {"fqdn": fqdn, "os": os_val, "id": server_id})
# Tags
tag_count = 0
if "<tags>" in block:
tag_block = block.split("<tags>")[1].split("</tags>")[0]
tag_ids = parse_xml(tag_block, "id")
tag_names = parse_xml(tag_block, "name")
# Supprimer anciens liens
db.execute(text("DELETE FROM qualys_asset_tags WHERE qualys_asset_id = :qid"), {"qid": qid})
for tid, tname in zip(tag_ids, tag_names):
# Upsert tag
db.execute(text("""
INSERT INTO qualys_tags (qualys_tag_id, name) VALUES (:tid, :tn)
ON CONFLICT (qualys_tag_id) DO UPDATE SET name=EXCLUDED.name, updated_at=now()
"""), {"tid": int(tid), "tn": tname})
# Lien asset-tag
db.execute(text("""
INSERT INTO qualys_asset_tags (qualys_asset_id, qualys_tag_id)
VALUES (:qid, :tid) ON CONFLICT DO NOTHING
"""), {"qid": qid, "tid": int(tid)})
tag_count += 1
db.commit()
return {"ok": True, "msg": f"Synchro OK — {tag_count} tags", "tags": tag_count}
def _find_asset_by_hostname(qualys_url, qualys_user, qualys_pass, hostname, proxies=None):
"""Cherche un asset Qualys par hostname"""
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/search/am/hostasset",
json={"ServiceRequest": {
"preferences": {"limitResults": 5},
"filters": {"Criteria": [
{"field": "name", "operator": "CONTAINS", "value": hostname}
]}
}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=60, proxies=proxies,
headers={"Content-Type": "application/json"}
)
if r.status_code == 200 and "SUCCESS" in r.text:
ids = parse_xml(r.text, "id")
if ids:
return int(ids[0])
except Exception:
pass
return None
def get_vuln_counts(db, ip_list, force_refresh=False):
"""Recupere le nombre de vulnerabilites actives severity 3,4,5 pour une liste d'IPs.
ip_list: str (IPs separees par virgules)
Retourne dict {ip: {severity3, severity4, severity5, total, confirmed, potential}}
"""
cache_key = f"qualys:vulns:{ip_list}"
if not force_refresh:
cached = _cache.get(cache_key)
if cached is not None:
return cached
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user or not ip_list:
return {}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/api/2.0/fo/asset/host/vm/detection/",
data={
"action": "list",
"ips": str(ip_list),
"severities": "3,4,5",
"status": "New,Active,Re-Opened",
"show_results": "0",
"output_format": "XML",
},
auth=(qualys_user, qualys_pass),
verify=False, timeout=120, proxies=proxies,
headers={"X-Requested-With": "Python"},
)
except Exception:
return {}
if r.status_code != 200:
return {}
txt = r.text
results = {}
for host_block in txt.split("<HOST>")[1:]:
host_block = host_block.split("</HOST>")[0]
host_ip = (parse_xml(host_block, "IP") or [""])[0]
if not host_ip:
continue
counts = {"severity3": 0, "severity4": 0, "severity5": 0,
"total": 0, "confirmed": 0, "potential": 0}
for det_block in host_block.split("<DETECTION>")[1:]:
det_block = det_block.split("</DETECTION>")[0]
severity = (parse_xml(det_block, "SEVERITY") or ["0"])[0]
det_type = (parse_xml(det_block, "TYPE") or [""])[0]
sev = int(severity) if severity.isdigit() else 0
if sev < 3:
continue
if det_type not in ("Confirmed", "Potential"):
continue
counts["total"] += 1
if sev == 3: counts["severity3"] += 1
elif sev == 4: counts["severity4"] += 1
elif sev == 5: counts["severity5"] += 1
if det_type == "Confirmed": counts["confirmed"] += 1
elif det_type == "Potential": counts["potential"] += 1
results[host_ip] = counts
_cache.set(cache_key, results, CACHE_TTL)
return results
def get_activation_keys(db):
"""Recupere les activation keys Qualys"""
cache_key = "qualys:actkeys"
cached = _cache.get(cache_key)
if cached is not None:
return cached
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return []
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/1.0/search/ca/agentactkey",
json={"ServiceRequest": {"preferences": {"limitResults": 20}}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=30, proxies=proxies,
headers={"X-Requested-With": "Python", "Content-Type": "application/json"})
except Exception:
return []
if r.status_code != 200:
return []
keys = []
for block in r.text.split("<AgentActKey>")[1:]:
block = block.split("</AgentActKey>")[0]
keys.append({
"id": (parse_xml(block, "id") or [""])[0],
"key": (parse_xml(block, "activationKey") or [""])[0],
"status": (parse_xml(block, "status") or [""])[0],
"title": (parse_xml(block, "title") or [""])[0],
"used": (parse_xml(block, "countUsed") or ["0"])[0],
"type": (parse_xml(block, "type") or [""])[0],
})
_cache.set(cache_key, keys, 3600) # 1h pour les keys
return keys
def get_agents_summary(db):
"""Resume des agents installes depuis les assets en base"""
rows = db.execute(text("""
SELECT agent_status, COUNT(*) as cnt,
COUNT(*) FILTER (WHERE agent_version IS NOT NULL AND agent_version != '') as with_version
FROM qualys_assets
WHERE agent_status IS NOT NULL AND agent_status != ''
GROUP BY agent_status ORDER BY cnt DESC
""")).fetchall()
versions = db.execute(text("""
SELECT agent_version, COUNT(*) as cnt
FROM qualys_assets
WHERE agent_version IS NOT NULL AND agent_version != ''
GROUP BY agent_version ORDER BY cnt DESC LIMIT 20
""")).fetchall()
return {"statuses": rows, "versions": versions}
def invalidate_search_cache():
"""Invalide tout le cache de recherche Qualys"""
_cache.clear_prefix("qualys:search:")
_cache.clear_prefix("qualys:vulns:")
def get_cache_stats():
"""Stats du cache"""
return _cache.stats()