diff --git a/app/routers/qualys.py b/app/routers/qualys.py index 216ed9a..55b50f4 100644 --- a/app/routers/qualys.py +++ b/app/routers/qualys.py @@ -432,6 +432,124 @@ async def qualys_search(request: Request, db=Depends(get_db), return templates.TemplateResponse("qualys_search.html", ctx) +@router.get("/qualys/vulns/{ip}", response_class=HTMLResponse) +async def qualys_vulns_detail(request: Request, ip: str, db=Depends(get_db)): + """Retourne le detail des vulns severity 3,4,5 pour une IP (fragment HTMX)""" + user = get_current_user(request) + if not user: + return HTMLResponse("

Non autorise

") + + from ..services.qualys_service import _get_qualys_creds, parse_xml + import requests as _req, urllib3, re as _re + urllib3.disable_warnings() + + qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db) + proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None + + def parse_cdata(txt, tag): + import re + m = re.search(rf'<{tag}>\s*(?:)?\s*', txt, re.DOTALL) + return m.group(1).strip() if m else "" + + # 1. Detections + try: + r = _req.post(f"{qualys_url}/api/2.0/fo/asset/host/vm/detection/", + data={"action": "list", "ips": ip, "severities": "3,4,5", + "status": "New,Active,Re-Opened", "show_results": "1", "output_format": "XML"}, + auth=(qualys_user, qualys_pass), verify=False, timeout=90, proxies=proxies, + headers={"X-Requested-With": "Python"}) + except Exception as e: + return HTMLResponse(f"

Erreur API: {e}

") + + detections = [] + qids = [] + for det in r.text.split("")[1:]: + det = det.split("")[0] + qid = (parse_xml(det, "QID") or [""])[0] + sev = (parse_xml(det, "SEVERITY") or [""])[0] + dtype = (parse_xml(det, "TYPE") or [""])[0] + status = (parse_xml(det, "STATUS") or [""])[0] + results = parse_cdata(det, "RESULTS") + sev_i = int(sev) if sev.isdigit() else 0 + if sev_i < 3 or dtype not in ("Confirmed", "Potential"): + continue + detections.append({"qid": qid, "severity": sev_i, "type": dtype, "status": status, "results": results}) + if qid not in qids: + qids.append(qid) + + if not detections: + return HTMLResponse("

Aucune vulnerabilite active (severity 3+)

") + + # 2. KB pour titre, CVE, description + kb = {} + if qids: + try: + r2 = _req.post(f"{qualys_url}/api/2.0/fo/knowledge_base/vuln/", + data={"action": "list", "ids": ",".join(qids)}, + auth=(qualys_user, qualys_pass), verify=False, timeout=90, proxies=proxies, + headers={"X-Requested-With": "Python"}) + for vuln_block in r2.text.split("")[1:]: + vuln_block = vuln_block.split("")[0] + qid = (parse_xml(vuln_block, "QID") or [""])[0] + title = parse_cdata(vuln_block, "TITLE") + diagnosis = parse_cdata(vuln_block, "DIAGNOSIS") + solution = parse_cdata(vuln_block, "SOLUTION") + consequence = parse_cdata(vuln_block, "CONSEQUENCE") + # CVEs + cves = [] + if "" in vuln_block: + cve_block = vuln_block.split("")[1].split("")[0] + cve_ids = _re.findall(r'', cve_block) + if not cve_ids: + cve_ids = parse_xml(cve_block, "ID") + cves = cve_ids + # CVSS + cvss = "" + if "" in vuln_block: + cvss = (parse_xml(vuln_block.split("")[1].split("")[0], "BASE") or [""])[0] + kb[qid] = {"title": title, "diagnosis": diagnosis, "solution": solution, + "consequence": consequence, "cves": cves, "cvss3": cvss} + except Exception: + pass + + # 3. Construire HTML + html = f'

{len(detections)} vulnerabilite(s) — {ip}

' + html += '' + html += '' + html += '' + html += '' + + for d in sorted(detections, key=lambda x: -x["severity"]): + qid = d["qid"] + k = kb.get(qid, {}) + sev_class = "badge-red" if d["severity"] >= 5 else ("badge-yellow" if d["severity"] >= 4 else "badge-gray") + cve_links = " ".join([f'{c}' for c in k.get("cves", [])]) + + html += f'' + html += f'' + html += f'' + html += f'' + html += f'' + html += f'' + html += f'' + html += '' + + # Ligne detail + diag = k.get("diagnosis", "")[:300] + results = d.get("results", "")[:200] + html += f'' + + html += '
SevQIDTitreTypeCVECVSS3
{d["severity"]}{qid}{k.get("title", "N/A")}{d["type"]}{cve_links or "-"}{k.get("cvss3", "-")}
' + if diag: + html += f'Detection : {diag}
' + if results: + html += f'Resultat : {results}
' + if k.get("solution"): + html += f'Solution : {k["solution"][:200]}' + html += '
' + return HTMLResponse(html) + + @router.get("/qualys/asset/{asset_id}", response_class=HTMLResponse) async def qualys_asset_detail(request: Request, asset_id: int, db=Depends(get_db)): user = get_current_user(request) diff --git a/app/templates/qualys_search.html b/app/templates/qualys_search.html index 6cb92a3..057a3db 100644 --- a/app/templates/qualys_search.html +++ b/app/templates/qualys_search.html @@ -45,6 +45,9 @@ + + + {% if assets %} @@ -169,11 +172,13 @@ function updateBulkTag() { {% set vc = vuln_map.get(ip|string, {}) if vuln_map else {} %} {% if vc and vc.total > 0 %} - + {% elif vc is mapping %}0 {% else %}-{% endif %}