patchcenter/app/routers/qualys.py

1108 lines
49 KiB
Python

"""Router Qualys — Tags, Recherche assets, Décodeur nomenclature"""
from fastapi import APIRouter, Request, Depends, Query, Form
from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
import csv, io, re, os
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, can_admin, base_context
from ..services.qualys_service import (
sync_server_qualys, search_assets_api, get_all_tags_api,
create_tag_api, delete_tag_api, add_tag_to_asset_api,
remove_tag_from_asset_api, resync_all_tags, get_vuln_counts,
get_activation_keys, get_agents_summary, invalidate_search_cache, get_cache_stats,
)
from ..config import APP_NAME
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
# Nomenclature maps (from server_decoder.py)
ENV_MAP = {'p': ('Production', 'ENV-PRD'), 'r': ('Recette', 'ENV-REC'), 'i': ('Pré-Prod', 'ENV-PPR'),
'v': ('Test', 'ENV-TST'), 'd': ('Développement', 'ENV-DEV'), 't': ('Test', 'ENV-TST'),
's': ('Production', 'ENV-PRD'), 'o': ('Pré-Prod', 'ENV-PPR')}
DOMAIN_MAP = {
'bot': ('Flux Libre', 'DOM-FL'), 'boo': ('Flux Libre', 'DOM-FL'), 'boc': ('Flux Libre', 'DOM-FL'),
'afl': ('Flux Libre', 'DOM-FL'), 'sup': ('Flux Libre', 'DOM-FL'),
'dsi': ('Infrastructure', 'DOM-INF'), 'cyb': ('Infrastructure', 'DOM-INF'),
'vsa': ('Infrastructure', 'DOM-INF'), 'iad': ('Infrastructure', 'DOM-INF'),
'bur': ('Infrastructure', 'DOM-INF'), 'aii': ('Infrastructure', 'DOM-INF'),
'ecm': ('Infrastructure', 'DOM-INF'), 'log': ('Infrastructure', 'DOM-INF'),
'bck': ('Infrastructure', 'DOM-INF'), 'gaw': ('Infrastructure', 'DOM-INF'),
'vid': ('Infrastructure', 'DOM-INF'), 'sim': ('Infrastructure', 'DOM-INF'),
'emv': ('EMV', 'TAG-EMV'), 'pci': ('EMV', 'TAG-EMV'),
'pea': ('Péage', 'DOM-PEA'), 'osa': ('Péage', 'DOM-PEA'), 'svp': ('Péage', 'DOM-PEA'),
'adv': ('Péage', 'DOM-PEA'), 'rpa': ('Péage', 'DOM-PEA'),
'ame': ('Trafic', 'DOM-TRA'), 'tra': ('Trafic', 'DOM-TRA'), 'dai': ('Trafic', 'DOM-TRA'),
'pat': ('Trafic', 'DOM-TRA'), 'dep': ('Trafic', 'DOM-TRA'), 'exp': ('Trafic', 'DOM-TRA'),
'sig': ('Trafic', 'DOM-TRA'), 'rau': ('Trafic', 'DOM-TRA'),
'dec': ('BI', 'DOM-BI'), 'sas': ('BI', 'DOM-BI'), 'bip': ('BI', 'DOM-BI'),
'int': ('Gestion', 'DOM-GES'), 'agt': ('Gestion', 'DOM-GES'), 'pin': ('Gestion', 'DOM-GES'),
'ech': ('Gestion', 'DOM-GES'),
}
AUTO_PREFIXES = ('ENV-', 'OS-', 'DOM-', 'TYP-', 'TAG-OBS', 'TAG-EMV')
def _save_api_results_to_db(db, assets):
"""Sauvegarde les résultats de recherche API dans la base locale"""
for a in assets:
if not a.get("qualys_asset_id"):
continue
hostname = (a.get("hostname") or a.get("name", "")).split(".")[0].lower()
os_val = a.get("os", "")
os_family = "linux" if any(k in os_val.lower() for k in ("linux", "red hat", "centos")) else "windows" if "windows" in os_val.lower() else None
srv = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname) = LOWER(:h)"),
{"h": hostname}).fetchone()
server_id = srv.id if srv else None
db.execute(text("""
INSERT INTO qualys_assets (qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family,
agent_status, agent_version, last_checkin, server_id)
VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid)
ON CONFLICT (qualys_asset_id) DO UPDATE SET
name=EXCLUDED.name, fqdn=EXCLUDED.fqdn, ip_address=EXCLUDED.ip_address,
os=EXCLUDED.os, os_family=EXCLUDED.os_family,
agent_status=EXCLUDED.agent_status, agent_version=EXCLUDED.agent_version,
last_checkin=EXCLUDED.last_checkin, server_id=COALESCE(EXCLUDED.server_id, qualys_assets.server_id),
updated_at=now()
"""), {
"qid": a["qualys_asset_id"], "name": a.get("name"), "hn": hostname,
"fqdn": a.get("fqdn") or None, "ip": a.get("ip_address") or None,
"os": os_val, "osf": os_family,
"ast": a.get("agent_status", ""), "av": a.get("agent_version", ""),
"lc": a.get("last_checkin") or None, "sid": server_id,
})
if a.get("tags"):
db.execute(text("DELETE FROM qualys_asset_tags WHERE qualys_asset_id = :qid"),
{"qid": a["qualys_asset_id"]})
for tag_name in a["tags"]:
tag_row = db.execute(text("SELECT qualys_tag_id FROM qualys_tags WHERE name = :n"),
{"n": tag_name}).fetchone()
if tag_row:
db.execute(text("""
INSERT INTO qualys_asset_tags (qualys_asset_id, qualys_tag_id)
VALUES (:qid, :tid) ON CONFLICT DO NOTHING
"""), {"qid": a["qualys_asset_id"], "tid": tag_row.qualys_tag_id})
db.commit()
def _decode_hostname(hostname):
"""Décode un hostname SANEF et retourne les tags suggérés"""
hn = hostname.lower().strip()
if hn.startswith('ls-'):
return {'type': 'Physique', 'env': 'Production', 'domain': 'Péage', 'tags': ['ENV-PRD', 'DOM-PEA', 'TYP-SRV', 'OS-WIN']}
if len(hn) < 4:
return {'type': '?', 'env': '?', 'domain': '?', 'tags': []}
# Type
machine = 'VM' if hn[0] == 'v' else 'Physique'
eqt = 'TYP-VIR' if hn[0] == 'v' else 'TYP-SRV'
# Env
env_info = ENV_MAP.get(hn[1], ('?', None))
env_name, env_tag = env_info
# Domain (try 4, 3, 2 char prefixes)
rest = hn[2:]
domain_name, domain_tag = '?', None
for length in (4, 3, 2):
prefix = rest[:length]
if prefix in DOMAIN_MAP:
domain_name, domain_tag = DOMAIN_MAP[prefix]
break
tags = [t for t in [env_tag, domain_tag, eqt] if t]
return {'type': machine, 'env': env_name, 'domain': domain_name, 'tags': tags}
# === TAGS ===
@router.get("/qualys/tags", response_class=HTMLResponse)
async def qualys_tags(request: Request, db=Depends(get_db),
search: str = Query(None), tag_type: str = Query(None)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "qualys"):
return RedirectResponse(url="/dashboard")
where = ["1=1"]
params = {}
if search:
where.append("qt.name ILIKE :q"); params["q"] = f"%{search}%"
if tag_type == "dyn":
where.append("qt.is_dynamic = true")
elif tag_type == "stat":
where.append("qt.is_dynamic = false")
wc = " AND ".join(where)
tags = db.execute(text(f"""
SELECT qt.*,
(SELECT COUNT(*) FROM qualys_asset_tags qat WHERE qat.qualys_tag_id = qt.qualys_tag_id) as asset_count
FROM qualys_tags qt WHERE {wc} ORDER BY qt.name
"""), params).fetchall()
stats = db.execute(text("""
SELECT COUNT(*) as total,
COUNT(*) FILTER (WHERE is_dynamic) as dyn,
COUNT(*) FILTER (WHERE NOT is_dynamic) as stat
FROM qualys_tags
""")).fetchone()
from ..services.qualys_service import is_refresh_running
sync_running = is_refresh_running()
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME, "tags": tags, "stats": stats,
"search": search, "tag_type": tag_type,
"can_edit_qualys": can_edit(perms, "qualys"),
"msg": request.query_params.get("msg"),
"sync_running": sync_running,
})
return templates.TemplateResponse("qualys_tags.html", ctx)
@router.post("/qualys/tags/resync")
async def qualys_tags_resync(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "qualys"):
return RedirectResponse(url="/qualys/tags")
# Bloque si une sync agents tourne deja (meme API Qualys)
from ..services.qualys_service import is_refresh_running
if is_refresh_running():
return RedirectResponse(url="/qualys/tags?msg=busy", status_code=303)
result = resync_all_tags(db)
if result.get("ok"):
msg = "resync_ok"
else:
# Encode le message d'erreur reel pour affichage
err = (result.get("msg") or "Erreur API Qualys").replace(" ", "+")[:120]
msg = f"resync_ko_{err}"
return RedirectResponse(url=f"/qualys/tags?msg={msg}", status_code=303)
@router.post("/qualys/tags/create")
async def qualys_tag_create(request: Request, db=Depends(get_db),
tag_name: str = Form(...)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "qualys"):
return RedirectResponse(url="/qualys/tags")
result = create_tag_api(db, tag_name.strip())
msg = "created" if result["ok"] else "create_error"
return RedirectResponse(url=f"/qualys/tags?msg={msg}", status_code=303)
@router.post("/qualys/tags/{tag_id}/delete")
async def qualys_tag_delete(request: Request, tag_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "qualys"):
return RedirectResponse(url="/qualys/tags")
result = delete_tag_api(db, tag_id)
msg = "deleted" if result["ok"] else "delete_error"
return RedirectResponse(url=f"/qualys/tags?msg={msg}", status_code=303)
@router.post("/qualys/asset/{asset_id}/tag/add")
async def qualys_asset_tag_add(request: Request, asset_id: int, db=Depends(get_db),
tag_id: str = Form(...)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "qualys"):
return RedirectResponse(url="/qualys/tags")
result = add_tag_to_asset_api(db, asset_id, int(tag_id))
color = "text-cyber-green" if result["ok"] else "text-cyber-red"
return HTMLResponse(f'<span class="text-xs {color}">{result["msg"]}</span>')
@router.post("/qualys/asset/{asset_id}/tag/remove")
async def qualys_asset_tag_remove(request: Request, asset_id: int, db=Depends(get_db),
tag_id: str = Form(...)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "qualys"):
return RedirectResponse(url="/qualys/tags")
result = remove_tag_from_asset_api(db, asset_id, int(tag_id))
color = "text-cyber-green" if result["ok"] else "text-cyber-red"
return HTMLResponse(f'<span class="text-xs {color}">{result["msg"]}</span>')
def _bulk_return_url(form, msg):
"""Construit l'URL de retour avec la recherche conservée"""
s = form.get("return_search", "")
f = form.get("return_field", "hostname")
return f"/qualys/search?field={f}&search={s}&msg={msg}"
@router.post("/qualys/bulk/add-tag")
async def qualys_bulk_add_tag(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "qualys"):
return RedirectResponse(url="/qualys/tags")
form = await request.form()
ids = [int(x) for x in form.get("asset_ids", "").split(",") if x.strip().isdigit()]
tid = int(form.get("tag_id", "0") or "0")
ok = 0; ko = 0
for aid in ids:
r = add_tag_to_asset_api(db, aid, tid)
if r["ok"]: ok += 1
else: ko += 1
return RedirectResponse(url=_bulk_return_url(form, f"bulk_add_{ok}_{ko}"), status_code=303)
@router.post("/qualys/bulk/remove-tag")
async def qualys_bulk_remove_tag(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "qualys"):
return RedirectResponse(url="/qualys/tags")
form = await request.form()
ids = [int(x) for x in form.get("asset_ids", "").split(",") if x.strip().isdigit()]
tid = int(form.get("tag_id", "0") or "0")
ok = 0; ko = 0
for aid in ids:
r = remove_tag_from_asset_api(db, aid, tid)
if r["ok"]: ok += 1
else: ko += 1
return RedirectResponse(url=_bulk_return_url(form, f"bulk_rm_{ok}_{ko}"), status_code=303)
@router.post("/qualys/resync-assets")
async def qualys_resync_assets(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "qualys"):
return RedirectResponse(url="/qualys/search")
form = await request.form()
ids = [int(x) for x in form.get("asset_ids", "").split(",") if x.strip().isdigit()]
ok = 0
for aid in ids:
result = search_assets_api(db, str(aid), field="id", operator="EQUALS")
if result.get("ok") and result.get("assets"):
_save_api_results_to_db(db, result["assets"])
ok += 1
return RedirectResponse(url=_bulk_return_url(form, f"resync_{ok}"), status_code=303)
@router.get("/qualys/bulk/tags-for-assets")
async def qualys_tags_for_assets(request: Request, db=Depends(get_db),
asset_ids: str = Query("")):
"""Retourne les tags STAT assignés aux assets sélectionnés (JSON)"""
from fastapi.responses import JSONResponse
user = get_current_user(request)
if not user:
return JSONResponse([])
ids = [int(x) for x in asset_ids.split(",") if x.strip().isdigit()]
if not ids:
return JSONResponse([])
placeholders = ",".join([f":id{i}" for i in range(len(ids))])
params = {f"id{i}": v for i, v in enumerate(ids)}
rows = db.execute(text(f"""
SELECT qt.qualys_tag_id as id, qt.name, COUNT(DISTINCT qat.qualys_asset_id) as count
FROM qualys_asset_tags qat
JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id
WHERE qat.qualys_asset_id IN ({placeholders}) AND qt.is_dynamic = false
GROUP BY qt.qualys_tag_id, qt.name
ORDER BY qt.name
"""), params).fetchall()
return JSONResponse([{"id": r.id, "name": r.name, "count": r.count} for r in rows])
@router.get("/qualys/tags/export")
async def qualys_tags_export(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "qualys"):
return RedirectResponse(url="/qualys/tags")
tags = db.execute(text("SELECT * FROM qualys_tags ORDER BY name")).fetchall()
output = io.StringIO()
writer = csv.writer(output, delimiter=";")
writer.writerow(["Nom", "ID Qualys", "Type", "Nb assets"])
for t in tags:
writer.writerow([t.name, t.qualys_tag_id, "DYN" if t.is_dynamic else "STAT", ""])
output.seek(0)
return StreamingResponse(iter(["\ufeff" + output.getvalue()]), media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=qualys_tags.csv"})
# === RECHERCHE ASSETS ===
@router.get("/qualys/search", response_class=HTMLResponse)
async def qualys_search(request: Request, db=Depends(get_db),
search: str = Query(None), field: str = Query("hostname")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "qualys"):
return RedirectResponse(url="/dashboard")
assets = []
api_msg = None
source = None
force = request.query_params.get("force", "") == "1"
cache_info = get_cache_stats()
if search:
from datetime import datetime, timedelta
cutoff = datetime.now() - timedelta(hours=24)
if force:
fresh = 0 # Forcer le resync API
elif field == "hostname":
fresh = db.execute(text("""
SELECT COUNT(*) FROM qualys_assets
WHERE (hostname ILIKE :q OR name ILIKE :q OR fqdn ILIKE :q)
AND updated_at > :cutoff
"""), {"q": f"%{search}%", "cutoff": cutoff}).scalar()
elif field == "ip":
fresh = db.execute(text("""
SELECT COUNT(*) FROM qualys_assets
WHERE CAST(ip_address AS TEXT) LIKE :q AND updated_at > :cutoff
"""), {"q": f"{search}%", "cutoff": cutoff}).scalar()
elif field == "tag":
fresh = db.execute(text("""
SELECT COUNT(*) FROM qualys_assets qa
WHERE qa.qualys_asset_id IN (
SELECT qat.qualys_asset_id FROM qualys_asset_tags qat
JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id
WHERE qt.name ILIKE :q
) AND qa.updated_at > :cutoff
"""), {"q": f"%{search}%", "cutoff": cutoff}).scalar()
else:
fresh = 0
if fresh > 0:
# Données fraiches en base — pas d'appel API
source = "base (< 24h)"
else:
# Appel API (cache 10min) + stockage en base
if field == "hostname":
result = search_assets_api(db, search, field="name", operator="CONTAINS", force_refresh=force)
elif field == "ip":
result = search_assets_api(db, search, field="address", operator="CONTAINS", force_refresh=force)
elif field == "tag":
result = search_assets_api(db, search, field="tagName", operator="EQUALS", force_refresh=force)
else:
result = {"ok": False, "msg": "Champ inconnu", "assets": []}
if result.get("ok") and result.get("assets"):
_save_api_results_to_db(db, result["assets"])
source = f"API Qualys ({len(result['assets'])} résultats)"
elif not result.get("ok"):
source = f"Erreur API: {result.get('msg', '?')}"
# Fallback sur la base meme si pas frais
source += " — fallback base"
else:
source = "API: aucun résultat"
# Lire depuis la base dans tous les cas
where_db = ["1=1"]; params_db = {}
if field == "hostname":
where_db.append("(qa.hostname ILIKE :q OR qa.name ILIKE :q OR qa.fqdn ILIKE :q)")
params_db["q"] = f"%{search}%"
elif field == "ip":
where_db.append("CAST(qa.ip_address AS TEXT) LIKE :q")
params_db["q"] = f"{search}%"
elif field == "tag":
where_db.append("""qa.qualys_asset_id IN (
SELECT qat.qualys_asset_id FROM qualys_asset_tags qat
JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id
WHERE qt.name ILIKE :q)""")
params_db["q"] = f"%{search}%"
wc_db = " AND ".join(where_db)
assets = db.execute(text(f"""
SELECT qa.*, s.hostname as srv_hostname, s.tier,
(SELECT string_agg(qt.name, ', ' ORDER BY qt.name)
FROM qualys_asset_tags qat JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id
WHERE qat.qualys_asset_id = qa.qualys_asset_id) as tags_list
FROM qualys_assets qa
LEFT JOIN servers s ON qa.server_id = s.id
WHERE {wc_db} ORDER BY qa.hostname LIMIT 200
"""), params_db).fetchall()
api_msg = f"{len(assets)} résultat(s) — source: {source}"
# Enrichir avec vulnérabilités (severity 3,4,5, Confirmed/Potential, Active)
vuln_map = {}
if assets:
ips = [str(a.ip_address) for a in assets if a.ip_address]
ips = [ip for ip in ips if ip and ip != "None"]
if ips:
try:
vuln_map = get_vuln_counts(db, ",".join(ips[:50]), force_refresh=force)
except Exception:
pass
all_tags = db.execute(text("SELECT qualys_tag_id, name FROM qualys_tags ORDER BY name")).fetchall()
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME, "assets": assets, "search": search,
"field": field, "api_msg": api_msg,
"all_tags": all_tags, "vuln_map": vuln_map,
"cache_info": cache_info,
"can_edit_qualys": can_edit(perms, "qualys"),
"msg": request.query_params.get("msg"),
})
return templates.TemplateResponse("qualys_search.html", ctx)
@router.get("/qualys/agents", response_class=HTMLResponse)
def qualys_agents_page(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "qualys"):
return RedirectResponse(url="/dashboard")
try:
keys = get_activation_keys(db)
except Exception:
keys = []
try:
summary = get_agents_summary(db)
except Exception:
summary = {"statuses": [], "versions": [], "total_assets": 0, "active": 0, "inactive": 0}
# Serveurs en prod sans agent Qualys
no_agent_rows = db.execute(text("""
SELECT s.hostname, s.os_family, s.etat, d.name as domain, e.name as env, z.name as zone
FROM servers s
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
LEFT JOIN domains d ON de.domain_id = d.id
LEFT JOIN environments e ON de.environment_id = e.id
LEFT JOIN zones z ON s.zone_id = z.id
WHERE NOT EXISTS (SELECT 1 FROM qualys_assets qa WHERE qa.hostname = s.hostname)
ORDER BY s.hostname
""")).fetchall()
no_agent = [{"hostname": r.hostname, "os_family": r.os_family, "etat": r.etat,
"domain": r.domain or "", "env": r.env or "", "zone": r.zone or ""} for r in no_agent_rows]
# Agents inactifs
inactive_rows = db.execute(text("""
SELECT qa.hostname, qa.os, qa.agent_version, qa.last_checkin, s.etat
FROM qualys_assets qa
LEFT JOIN servers s ON qa.server_id = s.id
WHERE qa.agent_status ILIKE '%inactive%'
ORDER BY qa.hostname
""")).fetchall()
inactive = [{"hostname": r.hostname, "os": r.os, "agent_version": r.agent_version,
"last_checkin": r.last_checkin, "etat": r.etat or ""} for r in inactive_rows]
from ..services.qualys_service import is_refresh_running
sync_running = is_refresh_running()
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME, "keys": keys, "summary": summary,
"no_agent_servers": no_agent, "inactive_agents": inactive,
"msg": request.query_params.get("msg", ""),
"sync_running": sync_running,
})
return templates.TemplateResponse("qualys_agents.html", ctx)
@router.post("/qualys/agents/refresh")
def qualys_agents_refresh(request: Request, db=Depends(get_db), mode: str = "diff"):
"""Sync Qualys. mode=diff (rapide, depuis dernier sync) ou full (complet)."""
from fastapi.responses import JSONResponse
user = get_current_user(request)
if not user:
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
perms = get_user_perms(db, user)
if not can_edit(perms, "qualys"):
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
from ..services.qualys_service import refresh_all_agents
try:
stats = refresh_all_agents(db, mode=mode)
if stats.get("busy"):
return JSONResponse(stats, status_code=409)
if not stats.get("ok"):
return JSONResponse(stats, status_code=500)
return JSONResponse({"ok": True, "msg": stats.get("msg") or f"{stats.get('created',0)} créés, {stats.get('updated',0)} mis à jour", "stats": stats})
except Exception as e:
import traceback; traceback.print_exc()
return JSONResponse({"ok": False, "msg": str(e)[:200]}, status_code=500)
@router.post("/qualys/agents/cancel")
def qualys_agents_cancel(request: Request, db=Depends(get_db)):
from fastapi.responses import JSONResponse
user = get_current_user(request)
if not user:
return JSONResponse({"ok": False}, status_code=401)
perms = get_user_perms(db, user)
if not can_edit(perms, "qualys"):
return JSONResponse({"ok": False}, status_code=403)
from ..services.qualys_service import cancel_refresh, is_refresh_running
if not is_refresh_running():
return JSONResponse({"ok": False, "msg": "Aucun refresh en cours"}, status_code=404)
return JSONResponse(cancel_refresh())
@router.get("/qualys/agents/export-no-agent")
async def export_no_agent_csv(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "qualys"):
return RedirectResponse(url="/qualys/agents")
import io, csv as _csv
rows = db.execute(text("""
SELECT s.hostname, s.os_family, s.etat, d.name as domain, e.name as env, z.name as zone
FROM servers s
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
LEFT JOIN domains d ON de.domain_id = d.id
LEFT JOIN environments e ON de.environment_id = e.id
LEFT JOIN zones z ON s.zone_id = z.id
WHERE NOT EXISTS (SELECT 1 FROM qualys_assets qa WHERE qa.hostname = s.hostname)
ORDER BY s.hostname
""")).fetchall()
output = io.StringIO()
w = _csv.writer(output, delimiter=";")
w.writerow(["Hostname", "OS", "Domaine", "Environnement", "Zone", "Etat"])
for r in rows:
w.writerow([r.hostname, r.os_family or "", r.domain or "", r.env or "", r.zone or "", r.etat or ""])
output.seek(0)
return StreamingResponse(
iter(["\ufeff" + output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=serveurs_sans_agent.csv"})
@router.get("/qualys/agents/export-inactive")
async def export_inactive_csv(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "qualys"):
return RedirectResponse(url="/qualys/agents")
import io, csv as _csv
rows = db.execute(text("""
SELECT qa.hostname, qa.os, qa.agent_version, qa.last_checkin, s.etat
FROM qualys_assets qa LEFT JOIN servers s ON qa.server_id = s.id
WHERE qa.agent_status ILIKE '%inactive%' ORDER BY qa.hostname
""")).fetchall()
output = io.StringIO()
w = _csv.writer(output, delimiter=";")
w.writerow(["Hostname", "OS", "Version agent", "Dernier check-in", "Etat"])
for r in rows:
lc = str(r.last_checkin)[:10] if r.last_checkin else ""
w.writerow([r.hostname, r.os or "", r.agent_version or "", lc, r.etat or ""])
output.seek(0)
return StreamingResponse(
iter(["\ufeff" + output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=agents_inactifs.csv"})
@router.get("/qualys/vulns/{ip}", response_class=HTMLResponse)
async def qualys_vulns_detail(request: Request, ip: str, db=Depends(get_db)):
"""Retourne le detail des vulns severity 3,4,5 pour une IP (fragment HTMX)"""
user = get_current_user(request)
if not user:
return HTMLResponse("<p>Non autorise</p>", status_code=401)
perms = get_user_perms(db, user)
if not can_view(perms, "qualys"):
return HTMLResponse("<p>Acces interdit</p>", status_code=403)
# Cache 10 min
from ..services import cache as _cache
cache_key = f"qualys:vulndetail:{ip}"
cached_html = _cache.get(cache_key)
if cached_html is not None:
return HTMLResponse(cached_html + '<p class="text-xs text-gray-600 mt-1">(cache 10 min)</p>')
from ..services.qualys_service import _get_qualys_creds, parse_xml
import requests as _req, urllib3, re as _re
urllib3.disable_warnings()
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
def parse_cdata(txt, tag):
import re
m = re.search(rf'<{tag}>\s*(?:<!\[CDATA\[)?(.*?)(?:\]\]>)?\s*</{tag}>', txt, re.DOTALL)
return m.group(1).strip() if m else ""
# 1. Detections
try:
r = _req.post(f"{qualys_url}/api/2.0/fo/asset/host/vm/detection/",
data={"action": "list", "ips": ip, "severities": "3,4,5",
"status": "New,Active,Re-Opened", "show_results": "1", "output_format": "XML"},
auth=(qualys_user, qualys_pass), verify=False, timeout=90, proxies=proxies,
headers={"X-Requested-With": "Python"})
except Exception as e:
return HTMLResponse(f"<p class='text-cyber-red'>Erreur API: {e}</p>")
detections = []
qids = []
for det in r.text.split("<DETECTION>")[1:]:
det = det.split("</DETECTION>")[0]
qid = (parse_xml(det, "QID") or [""])[0]
sev = (parse_xml(det, "SEVERITY") or [""])[0]
dtype = (parse_xml(det, "TYPE") or [""])[0]
status = (parse_xml(det, "STATUS") or [""])[0]
results = parse_cdata(det, "RESULTS")
sev_i = int(sev) if sev.isdigit() else 0
if sev_i < 3 or dtype not in ("Confirmed", "Potential"):
continue
detections.append({"qid": qid, "severity": sev_i, "type": dtype, "status": status, "results": results})
if qid not in qids:
qids.append(qid)
if not detections:
return HTMLResponse("<p class='text-gray-500 p-4'>Aucune vulnerabilite active (severity 3+)</p>")
# 2. KB pour titre, CVE, description
kb = {}
if qids:
try:
r2 = _req.post(f"{qualys_url}/api/2.0/fo/knowledge_base/vuln/",
data={"action": "list", "ids": ",".join(qids)},
auth=(qualys_user, qualys_pass), verify=False, timeout=90, proxies=proxies,
headers={"X-Requested-With": "Python"})
for vuln_block in r2.text.split("<VULN>")[1:]:
vuln_block = vuln_block.split("</VULN>")[0]
qid = (parse_xml(vuln_block, "QID") or [""])[0]
title = parse_cdata(vuln_block, "TITLE")
diagnosis = parse_cdata(vuln_block, "DIAGNOSIS")
solution = parse_cdata(vuln_block, "SOLUTION")
consequence = parse_cdata(vuln_block, "CONSEQUENCE")
# CVEs
cves = []
if "<CVE_LIST>" in vuln_block:
cve_block = vuln_block.split("<CVE_LIST>")[1].split("</CVE_LIST>")[0]
cve_ids = _re.findall(r'<!\[CDATA\[(CVE-[\d-]+)\]\]>', cve_block)
if not cve_ids:
cve_ids = parse_xml(cve_block, "ID")
cves = cve_ids
# CVSS
cvss = ""
if "<CVSS_V3>" in vuln_block:
cvss = (parse_xml(vuln_block.split("<CVSS_V3>")[1].split("</CVSS_V3>")[0], "BASE") or [""])[0]
kb[qid] = {"title": title, "diagnosis": diagnosis, "solution": solution,
"consequence": consequence, "cves": cves, "cvss3": cvss}
except Exception:
pass
# 3. Construire HTML
html = f'<div class="p-4"><h3 class="text-sm font-bold text-cyber-accent mb-3">{len(detections)} vulnerabilite(s) — {ip}</h3>'
html += '<table class="w-full table-cyber text-xs"><thead><tr>'
html += '<th class="p-2">Sev</th><th class="p-2">QID</th><th class="text-left p-2">Titre</th>'
html += '<th class="p-2">Type</th><th class="p-2">CVE</th><th class="p-2">CVSS3</th>'
html += '</tr></thead><tbody>'
for d in sorted(detections, key=lambda x: -x["severity"]):
qid = d["qid"]
k = kb.get(qid, {})
sev_class = "badge-red" if d["severity"] >= 5 else ("badge-yellow" if d["severity"] >= 4 else "badge-gray")
cve_links = " ".join([f'<a href="https://nvd.nist.gov/vuln/detail/{c}" target="_blank" class="text-cyber-accent hover:underline">{c}</a>' for c in k.get("cves", [])])
html += f'<tr>'
html += f'<td class="p-2 text-center"><span class="badge {sev_class}">{d["severity"]}</span></td>'
html += f'<td class="p-2 text-center font-mono">{qid}</td>'
html += f'<td class="p-2 text-cyber-accent">{k.get("title", "N/A")}</td>'
html += f'<td class="p-2 text-center">{d["type"]}</td>'
html += f'<td class="p-2">{cve_links or "-"}</td>'
html += f'<td class="p-2 text-center font-bold">{k.get("cvss3", "-")}</td>'
html += '</tr>'
# Ligne detail
html += f'<tr class="bg-cyber-hover/30"><td colspan="6" class="p-2 text-xs text-gray-400">'
# Parser le results pour en faire un tableau packages
results = d.get("results", "")
if results and ("Installed Version" in results or "Required Version" in results):
# Format: "Package Installed Version Required Version\npkg1 ver1 ver2\npkg2..."
lines = [l.strip() for l in results.replace("\t", " ").split("\n") if l.strip()]
# Retirer la ligne header
pkg_lines = [l for l in lines if not l.startswith("Package") and not l.startswith("Installed")]
if pkg_lines:
html += '<table class="w-full mt-1 mb-2" style="background:#0d1117;border-radius:4px;">'
html += '<thead><tr><th class="p-1 text-left text-gray-500">Package</th>'
html += '<th class="p-1 text-left text-cyber-red">Version installée</th>'
html += '<th class="p-1 text-left text-cyber-green">Version requise</th></tr></thead><tbody>'
for line in pkg_lines:
parts = line.split()
if len(parts) >= 3:
pkg = parts[0]
installed = parts[1]
required = " ".join(parts[2:])
html += f'<tr><td class="p-1 font-mono text-cyber-accent">{pkg}</td>'
html += f'<td class="p-1 font-mono text-cyber-red">{installed}</td>'
html += f'<td class="p-1 font-mono text-cyber-green">{required}</td></tr>'
elif len(parts) >= 1:
html += f'<tr><td colspan="3" class="p-1 font-mono">{line}</td></tr>'
html += '</tbody></table>'
else:
html += f'<b>Résultat :</b> <span class="font-mono">{results[:300]}</span><br>'
elif results:
html += f'<b>Résultat :</b> <span class="font-mono">{results[:300]}</span><br>'
# Diagnosis (nettoyé des tags HTML)
diag = k.get("diagnosis", "")
if diag:
diag_clean = _re.sub(r'<[^>]+>', ' ', diag).strip()[:300]
html += f'<b>Détection :</b> {diag_clean}<br>'
if k.get("solution"):
sol_clean = _re.sub(r'<[^>]+>', ' ', k["solution"]).strip()[:200]
html += f'<b>Solution :</b> {sol_clean}'
html += '</td></tr>'
html += '</tbody></table></div>'
_cache.set(cache_key, html, 600)
return HTMLResponse(html)
@router.get("/qualys/asset/{asset_id}", response_class=HTMLResponse)
async def qualys_asset_detail(request: Request, asset_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return HTMLResponse("<p>Non autorisé</p>", status_code=401)
perms = get_user_perms(db, user)
if not can_view(perms, "qualys"):
return HTMLResponse("<p>Acces interdit</p>", status_code=403)
asset = db.execute(text("SELECT * FROM qualys_assets WHERE qualys_asset_id = :aid"),
{"aid": asset_id}).fetchone()
if not asset:
return HTMLResponse("<p>Asset non trouvé</p>")
tags = db.execute(text("""
SELECT qt.name, qt.is_dynamic, qt.qualys_tag_id
FROM qualys_asset_tags qat JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id
WHERE qat.qualys_asset_id = :aid ORDER BY qt.name
"""), {"aid": asset_id}).fetchall()
decoded = _decode_hostname(asset.hostname or asset.name or "")
all_tags = db.execute(text(
"SELECT qualys_tag_id, name, is_dynamic FROM qualys_tags ORDER BY name"
)).fetchall()
return templates.TemplateResponse("partials/qualys_asset_detail.html", {
"request": request, "a": asset, "tags": tags, "decoded": decoded, "all_tags": all_tags,
})
@router.get("/qualys/search/export")
async def qualys_search_export(request: Request, db=Depends(get_db),
search: str = Query(""), field: str = Query("hostname")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
# Re-execute la recherche
where = ["1=1"]; params = {}
if search:
if field == "hostname":
where.append("(qa.hostname ILIKE :q OR qa.name ILIKE :q OR qa.fqdn ILIKE :q)")
params["q"] = f"%{search}%"
elif field == "ip":
where.append("CAST(qa.ip_address AS TEXT) LIKE :q"); params["q"] = f"{search}%"
wc = " AND ".join(where)
assets = db.execute(text(f"""
SELECT qa.hostname, qa.ip_address, qa.fqdn, qa.os, qa.agent_status,
qa.agent_version, qa.last_checkin, qa.qualys_asset_id
FROM qualys_assets qa WHERE {wc} ORDER BY qa.hostname LIMIT 1000
"""), params).fetchall()
output = io.StringIO()
writer = csv.writer(output, delimiter=";")
writer.writerow(["Hostname", "IP", "FQDN", "OS", "Agent", "Version", "Dernier check-in", "Qualys ID"])
for a in assets:
writer.writerow([a.hostname, a.ip_address, a.fqdn, a.os, a.agent_status,
a.agent_version, a.last_checkin, a.qualys_asset_id])
output.seek(0)
return StreamingResponse(iter(["\ufeff" + output.getvalue()]), media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=qualys_assets_{search or 'all'}.csv"})
# === DECODEUR ===
@router.get("/qualys/decoder", response_class=HTMLResponse)
async def qualys_decoder(request: Request, db=Depends(get_db),
hostname: str = Query(None)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "qualys"):
return RedirectResponse(url="/dashboard")
result = None
current_tags = []
if hostname:
result = _decode_hostname(hostname)
result["hostname"] = hostname
# Chercher les tags actuels dans Qualys
asset = db.execute(text(
"SELECT qualys_asset_id FROM qualys_assets WHERE LOWER(hostname) = LOWER(:h)"
), {"h": hostname.strip().split(".")[0].lower()}).fetchone()
if asset:
rows = db.execute(text("""
SELECT qt.name, qt.is_dynamic FROM qualys_asset_tags qat
JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id
WHERE qat.qualys_asset_id = :aid ORDER BY qt.name
"""), {"aid": asset.qualys_asset_id}).fetchall()
current_tags = [(r.name, r.is_dynamic) for r in rows]
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME, "result": result, "hostname": hostname,
"current_tags": current_tags, "auto_prefixes": AUTO_PREFIXES,
})
return templates.TemplateResponse("qualys_decoder.html", ctx)
# ═══════════════════════════════════════════════
# DEPLOIEMENT AGENT QUALYS
# ═══════════════════════════════════════════════
@router.get("/qualys/deploy", response_class=HTMLResponse)
async def qualys_deploy_page(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "qualys"):
return RedirectResponse(url="/dashboard")
from ..services.agent_deploy_service import list_packages
from ..services.secrets_service import get_secret
packages = list_packages()
servers = db.execute(text("""
SELECT s.id, s.hostname, s.os_family, s.os_version, s.etat, s.ssh_user, s.ssh_port, s.ssh_method,
d.name as domain, e.name as env,
qa.agent_version, qa.agent_status, qa.last_checkin
FROM servers s
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
LEFT JOIN domains d ON de.domain_id = d.id
LEFT JOIN environments e ON de.environment_id = e.id
LEFT JOIN qualys_assets qa ON qa.server_id = s.id
ORDER BY s.hostname
""")).fetchall()
servers = [dict(r._mapping) for r in servers]
for s in servers:
if s.get("last_checkin"):
s["last_checkin"] = str(s["last_checkin"])[:19]
# Recupere les activation keys actives via API Qualys (force refresh)
from ..services.qualys_service import get_activation_keys
try:
api_keys = get_activation_keys(db, force_refresh=True) or []
except Exception:
api_keys = []
# Filtre les keys actives uniquement
active_keys = [k for k in api_keys if (k.get("status") or "").upper() == "ACTIVE"]
# Separe par OS (Qualys retourne le champ type: WINDOWS / LINUX / etc.)
keys_windows = [k for k in active_keys if "WIN" in (k.get("type") or "").upper()]
keys_linux = [k for k in active_keys if "LINUX" in (k.get("type") or "").upper() or "LIN" in (k.get("type") or "").upper()]
# Defaults par OS : settings ou 1ere key de chaque type
default_act_win = get_secret(db, "qualys_activation_id_windows") or \
(keys_windows[0]["key"] if keys_windows else "")
default_act_lin = get_secret(db, "qualys_activation_id_linux") or \
(keys_linux[0]["key"] if keys_linux else "")
# Backward compat : si le settings legacy existe, l'utiliser pour Linux
default_activation = default_act_lin or get_secret(db, "qualys_activation_id") or ""
# Server URI deduit de l'URL Qualys (qualysapi.qualys.eu -> qagpublic.qg2)
qualys_url = get_secret(db, "qualys_url") or ""
server_uri = get_secret(db, "qualys_server_uri")
if not server_uri:
# POD SANEF = QG1 par defaut (qualysapi.qualys.eu)
if "qg2" in qualys_url:
server_uri = "https://qagpublic.qg2.apps.qualys.eu/CloudAgent/"
elif "qg3" in qualys_url:
server_uri = "https://qagpublic.qg3.apps.qualys.eu/CloudAgent/"
else:
server_uri = "https://qagpublic.qg1.apps.qualys.eu/CloudAgent/"
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME,
"packages": packages,
"servers": servers,
"activation_id": default_activation,
"activation_id_windows": default_act_win,
"activation_id_linux": default_act_lin,
"available_keys": active_keys,
"keys_windows": keys_windows,
"keys_linux": keys_linux,
"customer_id": get_secret(db, "qualys_customer_id") or "",
"server_uri": server_uri,
"msg": request.query_params.get("msg", ""),
})
return templates.TemplateResponse("qualys_deploy.html", ctx)
@router.post("/qualys/deploy/run")
async def qualys_deploy_run(request: Request, db=Depends(get_db)):
from fastapi.responses import JSONResponse
user = get_current_user(request)
if not user:
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
perms = get_user_perms(db, user)
if not can_edit(perms, "qualys"):
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
from ..services.agent_deploy_service import start_deploy_job
from ..services.secrets_service import get_secret, set_secret
body = await request.json()
server_ids = body.get("server_ids", "")
activation_id = body.get("activation_id", "") # legacy / fallback
activation_id_linux = body.get("activation_id_linux", "") or activation_id
activation_id_windows = body.get("activation_id_windows", "") or activation_id
customer_id = body.get("customer_id", "")
server_uri = body.get("server_uri", "")
package_deb = body.get("package_deb", "")
package_rpm = body.get("package_rpm", "")
force_downgrade = body.get("force_downgrade", False)
# Persiste les valeurs comme defaults (pour les prochaines fois)
try:
if activation_id_linux:
set_secret(db, "qualys_activation_id_linux", activation_id_linux, "Activation Key Qualys Linux")
if activation_id_windows:
set_secret(db, "qualys_activation_id_windows", activation_id_windows, "Activation Key Qualys Windows")
if customer_id:
set_secret(db, "qualys_customer_id", customer_id, "Customer ID Qualys")
if server_uri:
set_secret(db, "qualys_server_uri", server_uri, "Server URI Qualys")
db.commit()
except Exception:
pass
ids = [int(x) for x in str(server_ids).split(",") if x.strip().isdigit()]
if not ids:
return JSONResponse({"ok": False, "msg": "Aucun serveur sélectionné"})
ssh_key = get_secret(db, "ssh_key_file") or "/opt/patchcenter/keys/id_ed25519"
placeholders = ",".join(str(i) for i in ids)
rows = db.execute(text(f"""
SELECT id, hostname, os_family, os_version, ssh_user, ssh_port, ssh_method
FROM servers WHERE id IN ({placeholders})
""")).fetchall()
servers = [{"hostname": r.hostname, "os_family": r.os_family,
"os_version": r.os_version, "ssh_user": r.ssh_user, "ssh_port": r.ssh_port} for r in rows]
# Choisit l'activation_id selon l'OS de chaque serveur (Linux/Windows)
# Note: deploy_agent prend un seul activation_id, on annote chaque serveur avec le bon
for s in servers:
osf = (s.get("os_family") or "").lower()
s["_activation_id"] = activation_id_windows if osf == "windows" else activation_id_linux
# Pour le moment start_deploy_job utilise un activation_id global ; on prend celui Linux
# par defaut (la plupart des deploys SANEF). Si tous les serveurs sont Windows, prend Windows.
all_windows = all((s.get("os_family") or "").lower() == "windows" for s in servers)
final_activation = activation_id_windows if all_windows else activation_id_linux
job_id = start_deploy_job(servers, ssh_key, package_deb, package_rpm,
final_activation, customer_id, server_uri, force_downgrade=force_downgrade)
from ..services.audit_service import log_action
log_action(db, request, user, "qualys_deploy",
entity_type="deploy_job",
details={"job_id": job_id, "servers": len(servers)})
db.commit()
return JSONResponse({"ok": True, "job_id": job_id, "total": len(servers)})
@router.get("/qualys/deploy/status/{job_id}")
async def qualys_deploy_status(request: Request, job_id: str, db=Depends(get_db)):
from fastapi.responses import JSONResponse
user = get_current_user(request)
if not user:
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
from ..services.agent_deploy_service import get_deploy_job
job = get_deploy_job(job_id)
if not job:
return JSONResponse({"ok": False, "msg": "Job introuvable"}, status_code=404)
import time
elapsed = int(time.time() - job["started_at"])
return JSONResponse({
"ok": True,
"job_id": job_id,
"total": job["total"],
"done": job["done"],
"finished": job["finished"],
"elapsed": elapsed,
"servers": job["servers"],
"log": job["log"][-50:], # dernières 50 lignes
})
@router.post("/qualys/deploy/check")
async def qualys_deploy_check(request: Request, db=Depends(get_db)):
from fastapi.responses import JSONResponse
user = get_current_user(request)
if not user:
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
perms = get_user_perms(db, user)
if not can_view(perms, "qualys"):
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
from ..services.agent_deploy_service import check_agent
from ..services.secrets_service import get_secret
body = await request.json()
server_ids = body.get("server_ids", "")
ids = [int(x) for x in str(server_ids).split(",") if x.strip().isdigit()]
if not ids:
return JSONResponse({"ok": False, "msg": "Aucun serveur sélectionné"})
ssh_key = get_secret(db, "ssh_key_file") or "/opt/patchcenter/keys/id_ed25519"
placeholders = ",".join(str(i) for i in ids)
servers = db.execute(text(f"SELECT id, hostname, ssh_user, ssh_port FROM servers WHERE id IN ({placeholders})")).fetchall()
results = []
for srv in servers:
r = check_agent(srv.hostname, srv.ssh_user or "root", ssh_key, srv.ssh_port or 22)
results.append(r)
return JSONResponse({"ok": True, "results": results, "total": len(results),
"active": sum(1 for r in results if r["status"] == "ACTIVE"),
"not_installed": sum(1 for r in results if r["status"] == "NOT_INSTALLED"),
"failed": sum(1 for r in results if r["status"] == "CONNECTION_FAILED")})