patchcenter/app/services/qualys_service.py
Khalid MOUTAOUAKIL 662b9c3535 Fix vulns: utiliser IPs au lieu de QPS asset IDs pour API VMDR
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 22:42:30 +02:00

438 lines
18 KiB
Python

"""Service Qualys — sync tags pour un serveur via API"""
import re
import requests
import urllib3
from sqlalchemy import text
from .secrets_service import get_secret
urllib3.disable_warnings()
def _get_qualys_creds(db):
"""Recupere les credentials Qualys depuis les secrets chiffres"""
url = get_secret(db, "qualys_url") or "https://qualysapi.qualys.eu"
user = get_secret(db, "qualys_user") or ""
pwd = get_secret(db, "qualys_pass") or ""
proxy = get_secret(db, "qualys_proxy") or ""
bypass = (get_secret(db, "qualys_bypass_proxy") or "").lower() == "true"
if bypass:
proxy = ""
return url, user, pwd, proxy
def parse_xml(txt, tag):
return re.findall(f"<{tag}>([^<]*)</{tag}>", txt)
def search_assets_api(db, query, field="name", operator="CONTAINS"):
"""Recherche des assets via l'API Qualys en temps réel"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials Qualys non configurés", "assets": []}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/search/am/hostasset",
json={"ServiceRequest": {
"preferences": {"limitResults": 200},
"filters": {"Criteria": [
{"field": field, "operator": operator, "value": query}
]}
}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=60, proxies=proxies,
headers={"Content-Type": "application/json"}
)
except Exception as e:
return {"ok": False, "msg": f"Erreur API: {e}", "assets": []}
if r.status_code != 200 or "SUCCESS" not in r.text:
return {"ok": False, "msg": f"API HTTP {r.status_code}", "assets": []}
assets = _parse_assets_full(r.text)
return {"ok": True, "msg": f"{len(assets)} résultat(s)", "assets": assets}
def get_all_tags_api(db):
"""Récupère tous les tags depuis l'API Qualys"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés", "tags": []}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/search/am/tag",
json={"ServiceRequest": {"preferences": {"limitResults": 1000}}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=60, proxies=proxies,
headers={"Content-Type": "application/json"}
)
except Exception as e:
return {"ok": False, "msg": str(e), "tags": []}
if r.status_code != 200 or "SUCCESS" not in r.text:
return {"ok": False, "msg": f"HTTP {r.status_code}", "tags": []}
tags = []
for block in r.text.split("<Tag>")[1:]:
block = block.split("</Tag>")[0]
tid = (parse_xml(block, "id") or [""])[0]
tname = (parse_xml(block, "name") or [""])[0]
rule_type = (parse_xml(block, "ruleType") or [""])[0]
if tid and tname:
tags.append({"id": int(tid), "name": tname, "is_dynamic": bool(rule_type), "rule_type": rule_type})
return {"ok": True, "msg": f"{len(tags)} tags", "tags": tags}
def _parse_assets_full(text):
"""Parse le XML Qualys en liste de dicts enrichis"""
assets = []
for block in text.split("<HostAsset>")[1:]:
block = block.split("</HostAsset>")[0]
aid = (parse_xml(block, "id") or [""])[0]
name = (parse_xml(block, "name") or [""])[0]
fqdn = (parse_xml(block, "fqdn") or [""])[0]
address = (parse_xml(block, "address") or [""])[0]
os_val = (parse_xml(block, "os") or [""])[0]
agent_status = ""
agent_version = ""
last_checkin = ""
if "<agentInfo>" in block:
agent_status = (parse_xml(block, "status") or [""])[0]
agent_version = (parse_xml(block, "agentVersion") or [""])[0]
last_checkin = (parse_xml(block, "lastCheckedIn") or [""])[0]
# Tags
tags = []
if "<tags>" in block:
tag_block = block.split("<tags>")[1].split("</tags>")[0]
tag_names = parse_xml(tag_block, "name")
tags = tag_names
hostname = name.split(".")[0].lower() if name else ""
assets.append({
"qualys_asset_id": int(aid) if aid else None,
"name": name, "hostname": hostname, "fqdn": fqdn,
"ip_address": address, "os": os_val,
"agent_status": agent_status, "agent_version": agent_version,
"last_checkin": last_checkin, "tags": tags,
"tags_list": ", ".join(tags),
})
return assets
def create_tag_api(db, tag_name):
"""Crée un tag statique dans Qualys via API"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/create/am/tag",
json={"ServiceRequest": {"data": {"Tag": {"name": tag_name}}}},
auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies,
headers={"Content-Type": "application/json"})
if r.status_code == 200 and "SUCCESS" in r.text:
tid = (parse_xml(r.text, "id") or [""])[0]
if tid:
db.execute(text("""
INSERT INTO qualys_tags (qualys_tag_id, name, is_dynamic) VALUES (:tid, :n, false)
ON CONFLICT (qualys_tag_id) DO UPDATE SET name = EXCLUDED.name
"""), {"tid": int(tid), "n": tag_name})
db.commit()
return {"ok": True, "msg": f"Tag '{tag_name}' créé (ID: {tid})"}
return {"ok": False, "msg": f"Erreur API: {r.text[:200]}"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def delete_tag_api(db, qualys_tag_id):
"""Supprime un tag dans Qualys via API"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/delete/am/tag/{qualys_tag_id}",
auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies,
headers={"Content-Type": "application/json"})
if r.status_code == 200 and "SUCCESS" in r.text:
db.execute(text("DELETE FROM qualys_asset_tags WHERE qualys_tag_id = :tid"), {"tid": qualys_tag_id})
db.execute(text("DELETE FROM qualys_tags WHERE qualys_tag_id = :tid"), {"tid": qualys_tag_id})
db.commit()
return {"ok": True, "msg": "Tag supprimé"}
return {"ok": False, "msg": f"Erreur API: {r.text[:200]}"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def add_tag_to_asset_api(db, asset_id, tag_id):
"""Ajoute un tag à un asset via API Qualys"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/update/am/hostasset/{asset_id}",
json={"ServiceRequest": {"data": {"HostAsset": {"tags": {"add": {"TagSimple": {"id": tag_id}}}}}}},
auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies,
headers={"Content-Type": "application/json"})
if r.status_code == 200 and "SUCCESS" in r.text:
db.execute(text("""
INSERT INTO qualys_asset_tags (qualys_asset_id, qualys_tag_id)
VALUES (:aid, :tid) ON CONFLICT DO NOTHING
"""), {"aid": asset_id, "tid": tag_id})
db.commit()
return {"ok": True, "msg": "Tag ajouté"}
return {"ok": False, "msg": f"Erreur: {r.text[:200]}"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def remove_tag_from_asset_api(db, asset_id, tag_id):
"""Retire un tag d'un asset via API Qualys"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials non configurés"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/update/am/hostasset/{asset_id}",
json={"ServiceRequest": {"data": {"HostAsset": {"tags": {"remove": {"TagSimple": {"id": tag_id}}}}}}},
auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies,
headers={"Content-Type": "application/json"})
if r.status_code == 200 and "SUCCESS" in r.text:
db.execute(text("""
DELETE FROM qualys_asset_tags WHERE qualys_asset_id = :aid AND qualys_tag_id = :tid
"""), {"aid": asset_id, "tid": tag_id})
db.commit()
return {"ok": True, "msg": "Tag retiré"}
return {"ok": False, "msg": f"Erreur: {r.text[:200]}"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def resync_all_tags(db):
"""Resync tous les tags depuis l'API Qualys vers la base locale"""
result = get_all_tags_api(db)
if not result["ok"]:
return result
count = 0
for t in result["tags"]:
db.execute(text("""
INSERT INTO qualys_tags (qualys_tag_id, name, is_dynamic, rule_type)
VALUES (:tid, :n, :dyn, :rt)
ON CONFLICT (qualys_tag_id) DO UPDATE SET name = EXCLUDED.name, is_dynamic = EXCLUDED.is_dynamic,
rule_type = EXCLUDED.rule_type, updated_at = now()
"""), {"tid": t["id"], "n": t["name"], "dyn": t["is_dynamic"], "rt": t.get("rule_type")})
count += 1
db.commit()
return {"ok": True, "msg": f"{count} tags synchronisés"}
def sync_server_qualys(db, server_id):
"""Sync les tags Qualys pour un serveur donne. Retourne un dict resultat."""
row = db.execute(text(
"SELECT hostname, qualys_asset_id FROM servers WHERE id = :id"
), {"id": server_id}).fetchone()
if not row:
return {"ok": False, "msg": "Serveur introuvable"}
hostname = row.hostname
qid = row.qualys_asset_id
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user:
return {"ok": False, "msg": "Credentials Qualys non configures (Settings)"}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
# Chercher l'asset par hostname si pas de qualys_asset_id
if not qid:
qid = _find_asset_by_hostname(qualys_url, qualys_user, qualys_pass, hostname, proxies)
if not qid:
return {"ok": False, "msg": f"Asset '{hostname}' non trouve dans Qualys"}
db.execute(text("UPDATE servers SET qualys_asset_id = :qid WHERE id = :id"),
{"qid": qid, "id": server_id})
# Recuperer l'asset complet avec tags
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/search/am/hostasset",
json={"ServiceRequest": {
"filters": {"Criteria": [
{"field": "id", "operator": "EQUALS", "value": str(qid)}
]}
}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=60, proxies=proxies,
headers={"Content-Type": "application/json"}
)
except Exception as e:
return {"ok": False, "msg": f"Erreur API: {e}"}
if r.status_code != 200 or "SUCCESS" not in r.text:
return {"ok": False, "msg": f"API HTTP {r.status_code}"}
# Parser asset
blocks = r.text.split("<HostAsset>")
if len(blocks) < 2:
return {"ok": False, "msg": "Asset non trouve dans la reponse"}
block = blocks[1].split("</HostAsset>")[0]
fqdn = (parse_xml(block, "fqdn") or [""])[0]
address = (parse_xml(block, "address") or [""])[0]
os_val = (parse_xml(block, "os") or [""])[0]
agent_status = (parse_xml(block, "status") or [""])[0] if "<agentInfo>" in block else ""
agent_version = (parse_xml(block, "agentVersion") or [""])[0]
last_checkin = (parse_xml(block, "lastCheckedIn") or [""])[0] or None
os_family = None
os_low = os_val.lower()
if any(k in os_low for k in ("linux", "red hat", "centos", "debian")):
os_family = "linux"
elif "windows" in os_low:
os_family = "windows"
# Update qualys_assets
db.execute(text("""
INSERT INTO qualys_assets (qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family,
agent_status, agent_version, last_checkin, server_id)
VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid)
ON CONFLICT (qualys_asset_id) DO UPDATE SET
fqdn=EXCLUDED.fqdn, ip_address=EXCLUDED.ip_address, os=EXCLUDED.os,
os_family=EXCLUDED.os_family, agent_status=EXCLUDED.agent_status,
agent_version=EXCLUDED.agent_version, last_checkin=EXCLUDED.last_checkin, updated_at=now()
"""), {"qid": qid, "name": hostname, "hn": hostname.split(".")[0].lower(),
"fqdn": fqdn or None, "ip": address or None, "os": os_val, "osf": os_family,
"ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id})
# Enrichir servers
db.execute(text("""
UPDATE servers SET
fqdn = COALESCE(NULLIF(:fqdn, ''), fqdn),
os_version = COALESCE(NULLIF(:os, ''), os_version)
WHERE id = :id
"""), {"fqdn": fqdn, "os": os_val, "id": server_id})
# Tags
tag_count = 0
if "<tags>" in block:
tag_block = block.split("<tags>")[1].split("</tags>")[0]
tag_ids = parse_xml(tag_block, "id")
tag_names = parse_xml(tag_block, "name")
# Supprimer anciens liens
db.execute(text("DELETE FROM qualys_asset_tags WHERE qualys_asset_id = :qid"), {"qid": qid})
for tid, tname in zip(tag_ids, tag_names):
# Upsert tag
db.execute(text("""
INSERT INTO qualys_tags (qualys_tag_id, name) VALUES (:tid, :tn)
ON CONFLICT (qualys_tag_id) DO UPDATE SET name=EXCLUDED.name, updated_at=now()
"""), {"tid": int(tid), "tn": tname})
# Lien asset-tag
db.execute(text("""
INSERT INTO qualys_asset_tags (qualys_asset_id, qualys_tag_id)
VALUES (:qid, :tid) ON CONFLICT DO NOTHING
"""), {"qid": qid, "tid": int(tid)})
tag_count += 1
db.commit()
return {"ok": True, "msg": f"Synchro OK — {tag_count} tags", "tags": tag_count}
def _find_asset_by_hostname(qualys_url, qualys_user, qualys_pass, hostname, proxies=None):
"""Cherche un asset Qualys par hostname"""
try:
r = requests.post(
f"{qualys_url}/qps/rest/2.0/search/am/hostasset",
json={"ServiceRequest": {
"preferences": {"limitResults": 5},
"filters": {"Criteria": [
{"field": "name", "operator": "CONTAINS", "value": hostname}
]}
}},
auth=(qualys_user, qualys_pass),
verify=False, timeout=60, proxies=proxies,
headers={"Content-Type": "application/json"}
)
if r.status_code == 200 and "SUCCESS" in r.text:
ids = parse_xml(r.text, "id")
if ids:
return int(ids[0])
except Exception:
pass
return None
def get_vuln_counts(db, ip_list):
"""Recupere le nombre de vulnerabilites actives severity 3,4,5 pour une liste d'IPs.
ip_list: str (IPs separees par virgules)
Retourne dict {ip: {severity3, severity4, severity5, total, confirmed, potential}}
"""
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
if not qualys_user or not ip_list:
return {}
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
try:
r = requests.post(
f"{qualys_url}/api/2.0/fo/asset/host/vm/detection/",
data={
"action": "list",
"ips": str(ip_list),
"severities": "3,4,5",
"status": "New,Active,Re-Opened",
"show_results": "0",
"output_format": "XML",
},
auth=(qualys_user, qualys_pass),
verify=False, timeout=120, proxies=proxies,
headers={"X-Requested-With": "Python"},
)
except Exception:
return {}
if r.status_code != 200:
return {}
txt = r.text
results = {}
for host_block in txt.split("<HOST>")[1:]:
host_block = host_block.split("</HOST>")[0]
host_ip = (parse_xml(host_block, "IP") or [""])[0]
if not host_ip:
continue
counts = {"severity3": 0, "severity4": 0, "severity5": 0,
"total": 0, "confirmed": 0, "potential": 0}
for det_block in host_block.split("<DETECTION>")[1:]:
det_block = det_block.split("</DETECTION>")[0]
severity = (parse_xml(det_block, "SEVERITY") or ["0"])[0]
det_type = (parse_xml(det_block, "TYPE") or [""])[0]
sev = int(severity) if severity.isdigit() else 0
if sev < 3:
continue
if det_type not in ("Confirmed", "Potential"):
continue
counts["total"] += 1
if sev == 3: counts["severity3"] += 1
elif sev == 4: counts["severity4"] += 1
elif sev == 5: counts["severity5"] += 1
if det_type == "Confirmed": counts["confirmed"] += 1
elif det_type == "Potential": counts["potential"] += 1
results[host_ip] = counts
return results