"""Service Qualys — sync tags pour un serveur via API + cache memoire""" import re import requests import urllib3 from sqlalchemy import text from .secrets_service import get_secret from . import cache as _cache urllib3.disable_warnings() CACHE_TTL = 600 # 10 minutes def _get_qualys_creds(db): """Recupere les credentials Qualys depuis les secrets chiffres""" url = get_secret(db, "qualys_url") or "https://qualysapi.qualys.eu" user = get_secret(db, "qualys_user") or "" pwd = get_secret(db, "qualys_pass") or "" proxy = get_secret(db, "qualys_proxy") or "" bypass = (get_secret(db, "qualys_bypass_proxy") or "").lower() == "true" if bypass: proxy = "" return url, user, pwd, proxy def parse_xml(txt, tag): return re.findall(f"<{tag}>([^<]*)", txt) def search_assets_api(db, query, field="name", operator="CONTAINS", force_refresh=False): """Recherche des assets via l'API Qualys — cache 10 min""" cache_key = f"qualys:search:{field}:{query}" if not force_refresh: cached = _cache.get(cache_key) if cached is not None: cached["msg"] = cached.get("msg", "") + " (cache)" cached["from_cache"] = True return cached qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return {"ok": False, "msg": "Credentials Qualys non configurés", "assets": []} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None try: r = requests.post( f"{qualys_url}/qps/rest/2.0/search/am/hostasset", json={"ServiceRequest": { "preferences": {"limitResults": 200}, "filters": {"Criteria": [ {"field": field, "operator": operator, "value": query} ]} }}, auth=(qualys_user, qualys_pass), verify=False, timeout=60, proxies=proxies, headers={"Content-Type": "application/json"} ) except Exception as e: return {"ok": False, "msg": f"Erreur API: {e}", "assets": []} if r.status_code != 200 or "SUCCESS" not in r.text: return {"ok": False, "msg": f"API HTTP {r.status_code}", "assets": []} assets = _parse_assets_full(r.text) result = {"ok": True, "msg": f"{len(assets)} résultat(s)", "assets": assets, "from_cache": False} _cache.set(cache_key, result, CACHE_TTL) return result def get_all_tags_api(db): """Récupère tous les tags depuis l'API Qualys""" qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return {"ok": False, "msg": "Credentials non configurés", "tags": []} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None try: r = requests.post( f"{qualys_url}/qps/rest/2.0/search/am/tag", json={"ServiceRequest": {"preferences": {"limitResults": 1000}}}, auth=(qualys_user, qualys_pass), verify=False, timeout=60, proxies=proxies, headers={"Content-Type": "application/json"} ) except Exception as e: return {"ok": False, "msg": str(e), "tags": []} if r.status_code != 200 or "SUCCESS" not in r.text: return {"ok": False, "msg": f"HTTP {r.status_code}", "tags": []} tags = [] for block in r.text.split("")[1:]: block = block.split("")[0] tid = (parse_xml(block, "id") or [""])[0] tname = (parse_xml(block, "name") or [""])[0] rule_type = (parse_xml(block, "ruleType") or [""])[0] if tid and tname: tags.append({"id": int(tid), "name": tname, "is_dynamic": bool(rule_type), "rule_type": rule_type}) return {"ok": True, "msg": f"{len(tags)} tags", "tags": tags} def _parse_assets_full(text): """Parse le XML Qualys en liste de dicts enrichis""" assets = [] for block in text.split("")[1:]: block = block.split("")[0] aid = (parse_xml(block, "id") or [""])[0] name = (parse_xml(block, "name") or [""])[0] fqdn = (parse_xml(block, "fqdn") or [""])[0] address = (parse_xml(block, "address") or [""])[0] os_val = (parse_xml(block, "os") or [""])[0] agent_status = "" agent_version = "" last_checkin = "" if "" in block: agent_status = (parse_xml(block, "status") or [""])[0] agent_version = (parse_xml(block, "agentVersion") or [""])[0] last_checkin = (parse_xml(block, "lastCheckedIn") or [""])[0] # Tags tags = [] if "" in block: tag_block = block.split("")[1].split("")[0] tag_names = parse_xml(tag_block, "name") tags = tag_names hostname = name.split(".")[0].lower() if name else "" assets.append({ "qualys_asset_id": int(aid) if aid else None, "name": name, "hostname": hostname, "fqdn": fqdn, "ip_address": address, "os": os_val, "agent_status": agent_status, "agent_version": agent_version, "last_checkin": last_checkin, "tags": tags, "tags_list": ", ".join(tags), }) return assets def create_tag_api(db, tag_name): """Crée un tag statique dans Qualys via API""" qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return {"ok": False, "msg": "Credentials non configurés"} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None try: r = requests.post( f"{qualys_url}/qps/rest/2.0/create/am/tag", json={"ServiceRequest": {"data": {"Tag": {"name": tag_name}}}}, auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies, headers={"Content-Type": "application/json"}) if r.status_code == 200 and "SUCCESS" in r.text: tid = (parse_xml(r.text, "id") or [""])[0] if tid: db.execute(text(""" INSERT INTO qualys_tags (qualys_tag_id, name, is_dynamic) VALUES (:tid, :n, false) ON CONFLICT (qualys_tag_id) DO UPDATE SET name = EXCLUDED.name """), {"tid": int(tid), "n": tag_name}) db.commit() return {"ok": True, "msg": f"Tag '{tag_name}' créé (ID: {tid})"} return {"ok": False, "msg": f"Erreur API: {r.text[:200]}"} except Exception as e: return {"ok": False, "msg": str(e)} def delete_tag_api(db, qualys_tag_id): """Supprime un tag dans Qualys via API""" qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return {"ok": False, "msg": "Credentials non configurés"} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None try: r = requests.post( f"{qualys_url}/qps/rest/2.0/delete/am/tag/{qualys_tag_id}", auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies, headers={"Content-Type": "application/json"}) if r.status_code == 200 and "SUCCESS" in r.text: db.execute(text("DELETE FROM qualys_asset_tags WHERE qualys_tag_id = :tid"), {"tid": qualys_tag_id}) db.execute(text("DELETE FROM qualys_tags WHERE qualys_tag_id = :tid"), {"tid": qualys_tag_id}) db.commit() return {"ok": True, "msg": "Tag supprimé"} return {"ok": False, "msg": f"Erreur API: {r.text[:200]}"} except Exception as e: return {"ok": False, "msg": str(e)} def add_tag_to_asset_api(db, asset_id, tag_id): """Ajoute un tag à un asset via API Qualys""" qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return {"ok": False, "msg": "Credentials non configurés"} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None try: r = requests.post( f"{qualys_url}/qps/rest/2.0/update/am/hostasset/{asset_id}", json={"ServiceRequest": {"data": {"HostAsset": {"tags": {"add": {"TagSimple": {"id": tag_id}}}}}}}, auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies, headers={"Content-Type": "application/json"}) if r.status_code == 200 and "SUCCESS" in r.text: db.execute(text(""" INSERT INTO qualys_asset_tags (qualys_asset_id, qualys_tag_id) VALUES (:aid, :tid) ON CONFLICT DO NOTHING """), {"aid": asset_id, "tid": tag_id}) db.commit() return {"ok": True, "msg": "Tag ajouté"} return {"ok": False, "msg": f"Erreur: {r.text[:200]}"} except Exception as e: return {"ok": False, "msg": str(e)} def remove_tag_from_asset_api(db, asset_id, tag_id): """Retire un tag d'un asset via API Qualys""" qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return {"ok": False, "msg": "Credentials non configurés"} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None try: r = requests.post( f"{qualys_url}/qps/rest/2.0/update/am/hostasset/{asset_id}", json={"ServiceRequest": {"data": {"HostAsset": {"tags": {"remove": {"TagSimple": {"id": tag_id}}}}}}}, auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies, headers={"Content-Type": "application/json"}) if r.status_code == 200 and "SUCCESS" in r.text: db.execute(text(""" DELETE FROM qualys_asset_tags WHERE qualys_asset_id = :aid AND qualys_tag_id = :tid """), {"aid": asset_id, "tid": tag_id}) db.commit() return {"ok": True, "msg": "Tag retiré"} return {"ok": False, "msg": f"Erreur: {r.text[:200]}"} except Exception as e: return {"ok": False, "msg": str(e)} def resync_all_tags(db): """Resync tous les tags depuis l'API Qualys vers la base locale""" result = get_all_tags_api(db) if not result["ok"]: return result count = 0 for t in result["tags"]: db.execute(text(""" INSERT INTO qualys_tags (qualys_tag_id, name, is_dynamic, rule_type) VALUES (:tid, :n, :dyn, :rt) ON CONFLICT (qualys_tag_id) DO UPDATE SET name = EXCLUDED.name, is_dynamic = EXCLUDED.is_dynamic, rule_type = EXCLUDED.rule_type, updated_at = now() """), {"tid": t["id"], "n": t["name"], "dyn": t["is_dynamic"], "rt": t.get("rule_type")}) count += 1 db.commit() return {"ok": True, "msg": f"{count} tags synchronisés"} def sync_server_qualys(db, server_id): """Sync les tags Qualys pour un serveur donne. Retourne un dict resultat.""" row = db.execute(text( "SELECT hostname, qualys_asset_id FROM servers WHERE id = :id" ), {"id": server_id}).fetchone() if not row: return {"ok": False, "msg": "Serveur introuvable"} hostname = row.hostname qid = row.qualys_asset_id qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return {"ok": False, "msg": "Credentials Qualys non configures (Settings)"} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None # Chercher l'asset par hostname si pas de qualys_asset_id if not qid: qid = _find_asset_by_hostname(qualys_url, qualys_user, qualys_pass, hostname, proxies) if not qid: return {"ok": False, "msg": f"Asset '{hostname}' non trouve dans Qualys"} db.execute(text("UPDATE servers SET qualys_asset_id = :qid WHERE id = :id"), {"qid": qid, "id": server_id}) # Recuperer l'asset complet avec tags try: r = requests.post( f"{qualys_url}/qps/rest/2.0/search/am/hostasset", json={"ServiceRequest": { "filters": {"Criteria": [ {"field": "id", "operator": "EQUALS", "value": str(qid)} ]} }}, auth=(qualys_user, qualys_pass), verify=False, timeout=60, proxies=proxies, headers={"Content-Type": "application/json"} ) except Exception as e: return {"ok": False, "msg": f"Erreur API: {e}"} if r.status_code != 200 or "SUCCESS" not in r.text: return {"ok": False, "msg": f"API HTTP {r.status_code}"} # Parser asset blocks = r.text.split("") if len(blocks) < 2: return {"ok": False, "msg": "Asset non trouve dans la reponse"} block = blocks[1].split("")[0] fqdn = (parse_xml(block, "fqdn") or [""])[0] address = (parse_xml(block, "address") or [""])[0] os_val = (parse_xml(block, "os") or [""])[0] agent_status = (parse_xml(block, "status") or [""])[0] if "" in block else "" agent_version = (parse_xml(block, "agentVersion") or [""])[0] last_checkin = (parse_xml(block, "lastCheckedIn") or [""])[0] or None os_family = None os_low = os_val.lower() if any(k in os_low for k in ("linux", "red hat", "centos", "debian")): os_family = "linux" elif "windows" in os_low: os_family = "windows" # Update qualys_assets db.execute(text(""" INSERT INTO qualys_assets (qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family, agent_status, agent_version, last_checkin, server_id) VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid) ON CONFLICT (qualys_asset_id) DO UPDATE SET fqdn=EXCLUDED.fqdn, ip_address=EXCLUDED.ip_address, os=EXCLUDED.os, os_family=EXCLUDED.os_family, agent_status=EXCLUDED.agent_status, agent_version=EXCLUDED.agent_version, last_checkin=EXCLUDED.last_checkin, updated_at=now() """), {"qid": qid, "name": hostname, "hn": hostname.split(".")[0].lower(), "fqdn": fqdn or None, "ip": address or None, "os": os_val, "osf": os_family, "ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id}) # Enrichir servers db.execute(text(""" UPDATE servers SET fqdn = COALESCE(NULLIF(:fqdn, ''), fqdn), os_version = COALESCE(NULLIF(:os, ''), os_version) WHERE id = :id """), {"fqdn": fqdn, "os": os_val, "id": server_id}) # Tags tag_count = 0 if "" in block: tag_block = block.split("")[1].split("")[0] tag_ids = parse_xml(tag_block, "id") tag_names = parse_xml(tag_block, "name") # Supprimer anciens liens db.execute(text("DELETE FROM qualys_asset_tags WHERE qualys_asset_id = :qid"), {"qid": qid}) for tid, tname in zip(tag_ids, tag_names): # Upsert tag db.execute(text(""" INSERT INTO qualys_tags (qualys_tag_id, name) VALUES (:tid, :tn) ON CONFLICT (qualys_tag_id) DO UPDATE SET name=EXCLUDED.name, updated_at=now() """), {"tid": int(tid), "tn": tname}) # Lien asset-tag db.execute(text(""" INSERT INTO qualys_asset_tags (qualys_asset_id, qualys_tag_id) VALUES (:qid, :tid) ON CONFLICT DO NOTHING """), {"qid": qid, "tid": int(tid)}) tag_count += 1 db.commit() return {"ok": True, "msg": f"Synchro OK — {tag_count} tags", "tags": tag_count} def _find_asset_by_hostname(qualys_url, qualys_user, qualys_pass, hostname, proxies=None): """Cherche un asset Qualys par hostname""" try: r = requests.post( f"{qualys_url}/qps/rest/2.0/search/am/hostasset", json={"ServiceRequest": { "preferences": {"limitResults": 5}, "filters": {"Criteria": [ {"field": "name", "operator": "CONTAINS", "value": hostname} ]} }}, auth=(qualys_user, qualys_pass), verify=False, timeout=60, proxies=proxies, headers={"Content-Type": "application/json"} ) if r.status_code == 200 and "SUCCESS" in r.text: ids = parse_xml(r.text, "id") if ids: return int(ids[0]) except Exception: pass return None def get_vuln_counts(db, ip_list, force_refresh=False): """Recupere le nombre de vulnerabilites actives severity 3,4,5 pour une liste d'IPs. ip_list: str (IPs separees par virgules) Retourne dict {ip: {severity3, severity4, severity5, total, confirmed, potential}} """ cache_key = f"qualys:vulns:{ip_list}" if not force_refresh: cached = _cache.get(cache_key) if cached is not None: return cached qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user or not ip_list: return {} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None try: r = requests.post( f"{qualys_url}/api/2.0/fo/asset/host/vm/detection/", data={ "action": "list", "ips": str(ip_list), "severities": "3,4,5", "status": "New,Active,Re-Opened", "show_results": "0", "output_format": "XML", }, auth=(qualys_user, qualys_pass), verify=False, timeout=120, proxies=proxies, headers={"X-Requested-With": "Python"}, ) except Exception: return {} if r.status_code != 200: return {} txt = r.text results = {} for host_block in txt.split("")[1:]: host_block = host_block.split("")[0] host_ip = (parse_xml(host_block, "IP") or [""])[0] if not host_ip: continue counts = {"severity3": 0, "severity4": 0, "severity5": 0, "total": 0, "confirmed": 0, "potential": 0} for det_block in host_block.split("")[1:]: det_block = det_block.split("")[0] severity = (parse_xml(det_block, "SEVERITY") or ["0"])[0] det_type = (parse_xml(det_block, "TYPE") or [""])[0] sev = int(severity) if severity.isdigit() else 0 if sev < 3: continue if det_type not in ("Confirmed", "Potential"): continue counts["total"] += 1 if sev == 3: counts["severity3"] += 1 elif sev == 4: counts["severity4"] += 1 elif sev == 5: counts["severity5"] += 1 if det_type == "Confirmed": counts["confirmed"] += 1 elif det_type == "Potential": counts["potential"] += 1 results[host_ip] = counts _cache.set(cache_key, results, CACHE_TTL) return results def get_activation_keys(db): """Recupere les activation keys Qualys""" cache_key = "qualys:actkeys" cached = _cache.get(cache_key) if cached is not None: return cached qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return [] proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None try: r = requests.post( f"{qualys_url}/qps/rest/1.0/search/ca/agentactkey", json={"ServiceRequest": {"preferences": {"limitResults": 20}}}, auth=(qualys_user, qualys_pass), verify=False, timeout=30, proxies=proxies, headers={"X-Requested-With": "Python", "Content-Type": "application/json"}) except Exception: return [] if r.status_code != 200: return [] keys = [] for block in r.text.split("")[1:]: block = block.split("")[0] keys.append({ "id": (parse_xml(block, "id") or [""])[0], "key": (parse_xml(block, "activationKey") or [""])[0], "status": (parse_xml(block, "status") or [""])[0], "title": (parse_xml(block, "title") or [""])[0], "used": (parse_xml(block, "countUsed") or ["0"])[0], "type": (parse_xml(block, "type") or [""])[0], }) _cache.set(cache_key, keys, 3600) # 1h pour les keys return keys def get_agents_summary(db): """Resume des agents installes depuis les assets en base""" rows = db.execute(text(""" SELECT agent_status, COUNT(*) as cnt, COUNT(*) FILTER (WHERE agent_version IS NOT NULL AND agent_version != '') as with_version FROM qualys_assets WHERE agent_status IS NOT NULL AND agent_status != '' GROUP BY agent_status ORDER BY cnt DESC """)).fetchall() versions = db.execute(text(""" SELECT agent_version, COUNT(*) as cnt FROM qualys_assets WHERE agent_version IS NOT NULL AND agent_version != '' GROUP BY agent_version ORDER BY cnt DESC LIMIT 20 """)).fetchall() total = db.execute(text("SELECT COUNT(*) FROM qualys_assets")).scalar() active = db.execute(text("SELECT COUNT(*) FROM qualys_assets WHERE agent_status ILIKE '%%active%%' AND agent_status NOT ILIKE '%%inactive%%'")).scalar() inactive = db.execute(text("SELECT COUNT(*) FROM qualys_assets WHERE agent_status ILIKE '%%inactive%%'")).scalar() return {"statuses": rows, "versions": versions, "total_assets": total, "active": active, "inactive": inactive} def invalidate_search_cache(): """Invalide tout le cache de recherche Qualys""" _cache.clear_prefix("qualys:search:") _cache.clear_prefix("qualys:vulns:") def get_cache_stats(): """Stats du cache""" return _cache.stats() def refresh_all_agents(db): """Rafraichit tous les agents depuis l'API Qualys QPS (bulk)""" qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) if not qualys_user: return {"ok": False, "msg": "Credentials Qualys non configurés"} proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None stats = {"created": 0, "updated": 0, "errors": 0} try: r = requests.post( f"{qualys_url}/qps/rest/2.0/search/am/hostasset", json={"ServiceRequest": {"preferences": {"limitResults": 1000}}}, auth=(qualys_user, qualys_pass), verify=False, timeout=120, proxies=proxies, headers={"X-Requested-With": "PatchCenter", "Content-Type": "application/json"}) except Exception as e: return {"ok": False, "msg": str(e)} if r.status_code != 200 or "SUCCESS" not in r.text: return {"ok": False, "msg": f"API HTTP {r.status_code}"} for block in r.text.split("")[1:]: block = block.split("")[0] try: asset_id = (parse_xml(block, "id") or [""])[0] name = (parse_xml(block, "name") or [""])[0] hostname = name.split(".")[0].lower() if name else "" address = (parse_xml(block, "address") or [""])[0] fqdn = (parse_xml(block, "fqdn") or [""])[0] os_val = (parse_xml(block, "os") or [""])[0] agent_status = "" agent_version = "" last_checkin = None if "" in block: ab = block[block.find(""):block.find("")] agent_status = (parse_xml(ab, "status") or [""])[0] agent_version = (parse_xml(ab, "agentVersion") or [""])[0] lc = (parse_xml(ab, "lastCheckedIn") or [""])[0] last_checkin = lc if lc else None os_family = None if any(k in os_val.lower() for k in ("linux", "red hat", "centos", "debian")): os_family = "linux" elif "windows" in os_val.lower(): os_family = "windows" # Match server srv = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname)=LOWER(:h)"), {"h": hostname}).fetchone() server_id = srv.id if srv else None existing = db.execute(text("SELECT id FROM qualys_assets WHERE qualys_asset_id=:qid"), {"qid": int(asset_id)}).fetchone() if existing: db.execute(text("""UPDATE qualys_assets SET name=:name, hostname=:hn, fqdn=:fqdn, ip_address=:ip, os=:os, os_family=:osf, agent_status=:ast, agent_version=:av, last_checkin=:lc, server_id=:sid, updated_at=now() WHERE qualys_asset_id=:qid"""), {"qid": int(asset_id), "name": name, "hn": hostname, "fqdn": fqdn or None, "ip": address or None, "os": os_val, "osf": os_family, "ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id}) stats["updated"] += 1 else: db.execute(text("""INSERT INTO qualys_assets (qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family, agent_status, agent_version, last_checkin, server_id) VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid)"""), {"qid": int(asset_id), "name": name, "hn": hostname, "fqdn": fqdn or None, "ip": address or None, "os": os_val, "osf": os_family, "ast": agent_status, "av": agent_version, "lc": last_checkin, "sid": server_id}) stats["created"] += 1 except Exception: stats["errors"] += 1 db.commit() stats["ok"] = True stats["msg"] = f"{stats['created']} créés, {stats['updated']} mis à jour" return stats