From 3d043af194f3e219d4953b03efc00ab30657ae50 Mon Sep 17 00:00:00 2001 From: Admin MPCZ Date: Sat, 25 Apr 2026 10:17:40 +0000 Subject: [PATCH] feat(qualys): page doublons + suppression API Qualys 1-clic --- app/routers/qualys.py | 33 ++++++++ app/services/qualys_service.py | 116 +++++++++++++++++++++++++++ app/templates/base.html | 1 + app/templates/qualys_duplicates.html | 107 ++++++++++++++++++++++++ 4 files changed, 257 insertions(+) create mode 100644 app/templates/qualys_duplicates.html diff --git a/app/routers/qualys.py b/app/routers/qualys.py index 9531ef6..742923c 100644 --- a/app/routers/qualys.py +++ b/app/routers/qualys.py @@ -1295,3 +1295,36 @@ async def qualys_dashboard_history(request: Request, db=Depends(get_db), "dimension": dimension, "dim_value": dim_value, "dim_options": dim_options, "runs_count": runs_count}) return templates.TemplateResponse("qualys_dashboard_history.html", ctx) + + +# === DOUBLONS QUALYS === + +@router.get("/qualys/duplicates", response_class=HTMLResponse) +async def qualys_duplicates(request: Request, db=Depends(get_db), + refresh: int = 0): + user = get_current_user(request) + if not user: + return RedirectResponse(url="/login") + perms = get_user_perms(db, user) + if not can_view(perms, "qualys"): + return RedirectResponse(url="/dashboard") + from app.services.qualys_service import find_duplicate_hostnames + data = find_duplicate_hostnames(db, force_refresh=bool(refresh)) + ctx = base_context(request, db, user) + ctx.update({"dupes_data": data, + "can_delete": can_edit(perms, "qualys")}) + return templates.TemplateResponse("qualys_duplicates.html", ctx) + + +@router.post("/qualys/asset/{asset_id}/delete") +async def qualys_asset_delete(request: Request, asset_id: int, db=Depends(get_db)): + from fastapi.responses import JSONResponse + user = get_current_user(request) + if not user: + return JSONResponse({"ok": False, "msg": "Non authentifie"}, status_code=401) + perms = get_user_perms(db, user) + if not can_edit(perms, "qualys"): + return JSONResponse({"ok": False, "msg": "Permission refusee"}, status_code=403) + from app.services.qualys_service import delete_qualys_asset + res = delete_qualys_asset(db, asset_id) + return JSONResponse(res) diff --git a/app/services/qualys_service.py b/app/services/qualys_service.py index b476031..662e064 100644 --- a/app/services/qualys_service.py +++ b/app/services/qualys_service.py @@ -1104,3 +1104,119 @@ def load_vuln_history(db, period="day", days=30, dimension="global", dimension_v "high": r.high, "medium": r.medium, "sain": r.sain, "non_scanne": r.non_scanne, "vuln_total": r.critical + r.high + r.medium} for b, r in sorted(by_bucket.items())] + + +# =========================================================================== +# DOUBLONS QUALYS β€” liste + suppression via API +# =========================================================================== + +def fetch_all_qualys_assets(db, with_progress=False): + """Recupere TOUS les assets Qualys via API avec pagination (lastId). + Retourne liste de dicts {id, name, ip, last_check, agent_status}.""" + qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) + if not qualys_user: + return [] + proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None + all_assets = [] + last_id = 0 + while True: + body = {"ServiceRequest": {"preferences": {"limitResults": 1000}}} + if last_id: + body["ServiceRequest"]["filters"] = {"Criteria": [ + {"field": "id", "operator": "GREATER", "value": str(last_id)} + ]} + try: + r = requests.post( + f"{qualys_url}/qps/rest/2.0/search/am/hostasset", + json=body, auth=(qualys_user, qualys_pass), + verify=False, timeout=180, proxies=proxies, + headers={"Content-Type": "application/json"} + ) + except Exception: + break + if r.status_code != 200 or "SUCCESS" not in r.text: + break + batch = [] + for block in r.text.split("")[1:]: + block = block.split("")[0] + aid = (parse_xml(block, "id") or [""])[0] + name = (parse_xml(block, "name") or [""])[0] + addr = (parse_xml(block, "address") or [""])[0] + last_check = "" + if "" in block: + last_check = (parse_xml(block, "lastCheckedIn") or [""])[0] + agent_status = "" + if "" in block: + agent_status = (parse_xml(block, "status") or [""])[0] + if aid and aid.isdigit(): + batch.append({"id": int(aid), "name": name, "ip": addr, + "last_check": last_check, "agent_status": agent_status}) + if not batch: + break + all_assets.extend(batch) + last_id = max(b["id"] for b in batch) + if len(batch) < 1000: + break + return all_assets + + +def find_duplicate_hostnames(db, force_refresh=False): + """Identifie les hostnames en doublon. Cache 10min.""" + cache_key = "qualys:duplicates" + if not force_refresh: + cached = _cache.get(cache_key) + if cached is not None: + return cached + + from collections import defaultdict + assets = fetch_all_qualys_assets(db) + groups = defaultdict(list) + for a in assets: + shortname = a["name"].split(".")[0].lower() if a["name"] else "" + if not shortname: + continue + # Filtrer les hostnames qui sont des IP (ex: "192", "10") + if shortname.replace(".", "").isdigit(): + continue + groups[shortname].append(a) + + dupes = [] + for shortname, items in groups.items(): + if len(items) <= 1: + continue + # Trier par last_check desc (plus recent en premier) + items.sort(key=lambda x: x["last_check"] or "", reverse=True) + dupes.append({ + "shortname": shortname, + "count": len(items), + "assets": items, + "newest_check": items[0]["last_check"] or "", + "oldest_check": items[-1]["last_check"] or "", + }) + # Trier par count desc puis shortname + dupes.sort(key=lambda x: (-x["count"], x["shortname"])) + result = {"total_assets": len(assets), "duplicate_hostnames": len(dupes), + "total_zombies": sum(d["count"] - 1 for d in dupes), "items": dupes} + _cache.set(cache_key, result, CACHE_TTL) + return result + + +def delete_qualys_asset(db, asset_id): + """Supprime un asset Qualys via API. Retourne {ok, msg}.""" + qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) + if not qualys_user: + return {"ok": False, "msg": "Pas de creds"} + proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None + try: + r = requests.post( + f"{qualys_url}/qps/rest/2.0/delete/am/hostasset/{int(asset_id)}", + auth=(qualys_user, qualys_pass), verify=False, timeout=60, proxies=proxies, + headers={"Content-Type": "application/json"} + ) + if r.status_code == 200 and "SUCCESS" in r.text: + # Invalider le cache pour forcer un refresh + _cache.delete("qualys:duplicates") + return {"ok": True, "msg": "Supprime"} + return {"ok": False, "msg": f"HTTP {r.status_code}: {r.text[:300]}"} + except Exception as e: + return {"ok": False, "msg": str(e)} diff --git a/app/templates/base.html b/app/templates/base.html index d7c9529..3610103 100644 --- a/app/templates/base.html +++ b/app/templates/base.html @@ -139,6 +139,7 @@ ↳ Tags V3 (vue) ↳ Tags V3 (catalogue) ↳ Tags V3 (gap) + 🧟 Doublons Agents {% if p.qualys in ('edit', 'admin') %}DΓ©ployer Agent{% endif %} diff --git a/app/templates/qualys_duplicates.html b/app/templates/qualys_duplicates.html new file mode 100644 index 0000000..3b1a6da --- /dev/null +++ b/app/templates/qualys_duplicates.html @@ -0,0 +1,107 @@ +{% extends 'base.html' %} +{% block title %}Doublons Qualys{% endblock %} +{% block content %} + +
+

Doublons Qualys (assets zombies)

+ πŸ”„ Re-scan API +
+ +
+
+
Total assets Qualys
+
{{ dupes_data.total_assets }}
+
+
+
Hostnames en doublon
+
{{ dupes_data.duplicate_hostnames }}
+
+
+
Zombies a purger
+
{{ dupes_data.total_zombies }}
+
(= total - 1 par hostname doublonne)
+
+
+ +{% if not dupes_data.items %} +
+ Aucun doublon detecte. Si la page semble vide, clique sur "Re-scan API" pour forcer un scan complet. +
+{% else %} + +
+

+ Le plus recent (en haut de chaque groupe) est probablement l'asset actif. Les autres sont des zombies (anciennes installations, rΓ©-IPs, doublons de scan). + Bouton Supprimer = appel API Qualys POST /qps/rest/2.0/delete/am/hostasset/{id}. + {% if not can_delete %}
Tu n'as pas la permission edit pour supprimer.{% endif %} +

+ + + + + + + + + + + + + + {% for grp in dupes_data.items %} + {% for a in grp.assets %} + + + + + + + + + + {% endfor %} + {% endfor %} + +
HostnameNbID QualysIPAgentLast check-inAction
{% if loop.first %}{{ grp.shortname }} ({{ grp.count }}){% endif %}{% if loop.first %}{{ loop.length }}{% endif %}{{ a.id }}{{ a.ip or '-' }}{{ a.agent_status or '-' }} + {{ a.last_check[:16] if a.last_check else '(jamais)' }} + + {% if loop.first %} + βœ“ actif + {% elif can_delete %} + + {% else %} + zombie + {% endif %} +
+
+ + + +{% endif %} +{% endblock %}