1045 lines
46 KiB
Python
1045 lines
46 KiB
Python
"""Router Qualys — Tags, Recherche assets, Décodeur nomenclature"""
|
|
from fastapi import APIRouter, Request, Depends, Query, Form
|
|
from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from sqlalchemy import text
|
|
import csv, io, re, os
|
|
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, can_admin, base_context
|
|
from ..services.qualys_service import (
|
|
sync_server_qualys, search_assets_api, get_all_tags_api,
|
|
create_tag_api, delete_tag_api, add_tag_to_asset_api,
|
|
remove_tag_from_asset_api, resync_all_tags, get_vuln_counts,
|
|
get_activation_keys, get_agents_summary, invalidate_search_cache, get_cache_stats,
|
|
)
|
|
from ..config import APP_NAME
|
|
|
|
router = APIRouter()
|
|
templates = Jinja2Templates(directory="app/templates")
|
|
|
|
# Nomenclature maps (from server_decoder.py)
|
|
ENV_MAP = {'p': ('Production', 'ENV-PRD'), 'r': ('Recette', 'ENV-REC'), 'i': ('Pré-Prod', 'ENV-PPR'),
|
|
'v': ('Test', 'ENV-TST'), 'd': ('Développement', 'ENV-DEV'), 't': ('Test', 'ENV-TST'),
|
|
's': ('Production', 'ENV-PRD'), 'o': ('Pré-Prod', 'ENV-PPR')}
|
|
DOMAIN_MAP = {
|
|
'bot': ('Flux Libre', 'DOM-FL'), 'boo': ('Flux Libre', 'DOM-FL'), 'boc': ('Flux Libre', 'DOM-FL'),
|
|
'afl': ('Flux Libre', 'DOM-FL'), 'sup': ('Flux Libre', 'DOM-FL'),
|
|
'dsi': ('Infrastructure', 'DOM-INF'), 'cyb': ('Infrastructure', 'DOM-INF'),
|
|
'vsa': ('Infrastructure', 'DOM-INF'), 'iad': ('Infrastructure', 'DOM-INF'),
|
|
'bur': ('Infrastructure', 'DOM-INF'), 'aii': ('Infrastructure', 'DOM-INF'),
|
|
'ecm': ('Infrastructure', 'DOM-INF'), 'log': ('Infrastructure', 'DOM-INF'),
|
|
'bck': ('Infrastructure', 'DOM-INF'), 'gaw': ('Infrastructure', 'DOM-INF'),
|
|
'vid': ('Infrastructure', 'DOM-INF'), 'sim': ('Infrastructure', 'DOM-INF'),
|
|
'emv': ('EMV', 'TAG-EMV'), 'pci': ('EMV', 'TAG-EMV'),
|
|
'pea': ('Péage', 'DOM-PEA'), 'osa': ('Péage', 'DOM-PEA'), 'svp': ('Péage', 'DOM-PEA'),
|
|
'adv': ('Péage', 'DOM-PEA'), 'rpa': ('Péage', 'DOM-PEA'),
|
|
'ame': ('Trafic', 'DOM-TRA'), 'tra': ('Trafic', 'DOM-TRA'), 'dai': ('Trafic', 'DOM-TRA'),
|
|
'pat': ('Trafic', 'DOM-TRA'), 'dep': ('Trafic', 'DOM-TRA'), 'exp': ('Trafic', 'DOM-TRA'),
|
|
'sig': ('Trafic', 'DOM-TRA'), 'rau': ('Trafic', 'DOM-TRA'),
|
|
'dec': ('BI', 'DOM-BI'), 'sas': ('BI', 'DOM-BI'), 'bip': ('BI', 'DOM-BI'),
|
|
'int': ('Gestion', 'DOM-GES'), 'agt': ('Gestion', 'DOM-GES'), 'pin': ('Gestion', 'DOM-GES'),
|
|
'ech': ('Gestion', 'DOM-GES'),
|
|
}
|
|
AUTO_PREFIXES = ('ENV-', 'OS-', 'DOM-', 'TYP-', 'TAG-OBS', 'TAG-EMV')
|
|
|
|
|
|
def _save_api_results_to_db(db, assets):
|
|
"""Sauvegarde les résultats de recherche API dans la base locale"""
|
|
for a in assets:
|
|
if not a.get("qualys_asset_id"):
|
|
continue
|
|
hostname = (a.get("hostname") or a.get("name", "")).split(".")[0].lower()
|
|
os_val = a.get("os", "")
|
|
os_family = "linux" if any(k in os_val.lower() for k in ("linux", "red hat", "centos")) else "windows" if "windows" in os_val.lower() else None
|
|
|
|
srv = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname) = LOWER(:h)"),
|
|
{"h": hostname}).fetchone()
|
|
server_id = srv.id if srv else None
|
|
|
|
db.execute(text("""
|
|
INSERT INTO qualys_assets (qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family,
|
|
agent_status, agent_version, last_checkin, server_id)
|
|
VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid)
|
|
ON CONFLICT (qualys_asset_id) DO UPDATE SET
|
|
name=EXCLUDED.name, fqdn=EXCLUDED.fqdn, ip_address=EXCLUDED.ip_address,
|
|
os=EXCLUDED.os, os_family=EXCLUDED.os_family,
|
|
agent_status=EXCLUDED.agent_status, agent_version=EXCLUDED.agent_version,
|
|
last_checkin=EXCLUDED.last_checkin, server_id=COALESCE(EXCLUDED.server_id, qualys_assets.server_id),
|
|
updated_at=now()
|
|
"""), {
|
|
"qid": a["qualys_asset_id"], "name": a.get("name"), "hn": hostname,
|
|
"fqdn": a.get("fqdn") or None, "ip": a.get("ip_address") or None,
|
|
"os": os_val, "osf": os_family,
|
|
"ast": a.get("agent_status", ""), "av": a.get("agent_version", ""),
|
|
"lc": a.get("last_checkin") or None, "sid": server_id,
|
|
})
|
|
|
|
if a.get("tags"):
|
|
db.execute(text("DELETE FROM qualys_asset_tags WHERE qualys_asset_id = :qid"),
|
|
{"qid": a["qualys_asset_id"]})
|
|
for tag_name in a["tags"]:
|
|
tag_row = db.execute(text("SELECT qualys_tag_id FROM qualys_tags WHERE name = :n"),
|
|
{"n": tag_name}).fetchone()
|
|
if tag_row:
|
|
db.execute(text("""
|
|
INSERT INTO qualys_asset_tags (qualys_asset_id, qualys_tag_id)
|
|
VALUES (:qid, :tid) ON CONFLICT DO NOTHING
|
|
"""), {"qid": a["qualys_asset_id"], "tid": tag_row.qualys_tag_id})
|
|
|
|
db.commit()
|
|
|
|
|
|
def _decode_hostname(hostname):
|
|
"""Décode un hostname SANEF et retourne les tags suggérés"""
|
|
hn = hostname.lower().strip()
|
|
if hn.startswith('ls-'):
|
|
return {'type': 'Physique', 'env': 'Production', 'domain': 'Péage', 'tags': ['ENV-PRD', 'DOM-PEA', 'TYP-SRV', 'OS-WIN']}
|
|
|
|
if len(hn) < 4:
|
|
return {'type': '?', 'env': '?', 'domain': '?', 'tags': []}
|
|
|
|
# Type
|
|
machine = 'VM' if hn[0] == 'v' else 'Physique'
|
|
eqt = 'TYP-VIR' if hn[0] == 'v' else 'TYP-SRV'
|
|
|
|
# Env
|
|
env_info = ENV_MAP.get(hn[1], ('?', None))
|
|
env_name, env_tag = env_info
|
|
|
|
# Domain (try 4, 3, 2 char prefixes)
|
|
rest = hn[2:]
|
|
domain_name, domain_tag = '?', None
|
|
for length in (4, 3, 2):
|
|
prefix = rest[:length]
|
|
if prefix in DOMAIN_MAP:
|
|
domain_name, domain_tag = DOMAIN_MAP[prefix]
|
|
break
|
|
|
|
tags = [t for t in [env_tag, domain_tag, eqt] if t]
|
|
return {'type': machine, 'env': env_name, 'domain': domain_name, 'tags': tags}
|
|
|
|
|
|
# === TAGS ===
|
|
|
|
@router.get("/qualys/tags", response_class=HTMLResponse)
|
|
async def qualys_tags(request: Request, db=Depends(get_db),
|
|
search: str = Query(None), tag_type: str = Query(None)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_view(perms, "qualys"):
|
|
return RedirectResponse(url="/dashboard")
|
|
|
|
where = ["1=1"]
|
|
params = {}
|
|
if search:
|
|
where.append("qt.name ILIKE :q"); params["q"] = f"%{search}%"
|
|
if tag_type == "dyn":
|
|
where.append("qt.is_dynamic = true")
|
|
elif tag_type == "stat":
|
|
where.append("qt.is_dynamic = false")
|
|
wc = " AND ".join(where)
|
|
|
|
tags = db.execute(text(f"""
|
|
SELECT qt.*,
|
|
(SELECT COUNT(*) FROM qualys_asset_tags qat WHERE qat.qualys_tag_id = qt.qualys_tag_id) as asset_count
|
|
FROM qualys_tags qt WHERE {wc} ORDER BY qt.name
|
|
"""), params).fetchall()
|
|
|
|
stats = db.execute(text("""
|
|
SELECT COUNT(*) as total,
|
|
COUNT(*) FILTER (WHERE is_dynamic) as dyn,
|
|
COUNT(*) FILTER (WHERE NOT is_dynamic) as stat
|
|
FROM qualys_tags
|
|
""")).fetchone()
|
|
|
|
from ..services.qualys_service import is_refresh_running
|
|
sync_running = is_refresh_running()
|
|
|
|
ctx = base_context(request, db, user)
|
|
ctx.update({
|
|
"app_name": APP_NAME, "tags": tags, "stats": stats,
|
|
"search": search, "tag_type": tag_type,
|
|
"can_edit_qualys": can_edit(perms, "qualys"),
|
|
"msg": request.query_params.get("msg"),
|
|
"sync_running": sync_running,
|
|
})
|
|
return templates.TemplateResponse("qualys_tags.html", ctx)
|
|
|
|
|
|
@router.post("/qualys/tags/resync")
|
|
async def qualys_tags_resync(request: Request, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "qualys"):
|
|
return RedirectResponse(url="/qualys/tags")
|
|
# Bloque si une sync agents tourne deja (meme API Qualys)
|
|
from ..services.qualys_service import is_refresh_running
|
|
if is_refresh_running():
|
|
return RedirectResponse(url="/qualys/tags?msg=busy", status_code=303)
|
|
result = resync_all_tags(db)
|
|
if result.get("ok"):
|
|
msg = "resync_ok"
|
|
else:
|
|
# Encode le message d'erreur reel pour affichage
|
|
err = (result.get("msg") or "Erreur API Qualys").replace(" ", "+")[:120]
|
|
msg = f"resync_ko_{err}"
|
|
return RedirectResponse(url=f"/qualys/tags?msg={msg}", status_code=303)
|
|
|
|
|
|
@router.post("/qualys/tags/create")
|
|
async def qualys_tag_create(request: Request, db=Depends(get_db),
|
|
tag_name: str = Form(...)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "qualys"):
|
|
return RedirectResponse(url="/qualys/tags")
|
|
result = create_tag_api(db, tag_name.strip())
|
|
msg = "created" if result["ok"] else "create_error"
|
|
return RedirectResponse(url=f"/qualys/tags?msg={msg}", status_code=303)
|
|
|
|
|
|
@router.post("/qualys/tags/{tag_id}/delete")
|
|
async def qualys_tag_delete(request: Request, tag_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "qualys"):
|
|
return RedirectResponse(url="/qualys/tags")
|
|
result = delete_tag_api(db, tag_id)
|
|
msg = "deleted" if result["ok"] else "delete_error"
|
|
return RedirectResponse(url=f"/qualys/tags?msg={msg}", status_code=303)
|
|
|
|
|
|
@router.post("/qualys/asset/{asset_id}/tag/add")
|
|
async def qualys_asset_tag_add(request: Request, asset_id: int, db=Depends(get_db),
|
|
tag_id: str = Form(...)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "qualys"):
|
|
return RedirectResponse(url="/qualys/tags")
|
|
result = add_tag_to_asset_api(db, asset_id, int(tag_id))
|
|
color = "text-cyber-green" if result["ok"] else "text-cyber-red"
|
|
return HTMLResponse(f'<span class="text-xs {color}">{result["msg"]}</span>')
|
|
|
|
|
|
@router.post("/qualys/asset/{asset_id}/tag/remove")
|
|
async def qualys_asset_tag_remove(request: Request, asset_id: int, db=Depends(get_db),
|
|
tag_id: str = Form(...)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "qualys"):
|
|
return RedirectResponse(url="/qualys/tags")
|
|
result = remove_tag_from_asset_api(db, asset_id, int(tag_id))
|
|
color = "text-cyber-green" if result["ok"] else "text-cyber-red"
|
|
return HTMLResponse(f'<span class="text-xs {color}">{result["msg"]}</span>')
|
|
|
|
|
|
def _bulk_return_url(form, msg):
|
|
"""Construit l'URL de retour avec la recherche conservée"""
|
|
s = form.get("return_search", "")
|
|
f = form.get("return_field", "hostname")
|
|
return f"/qualys/search?field={f}&search={s}&msg={msg}"
|
|
|
|
|
|
@router.post("/qualys/bulk/add-tag")
|
|
async def qualys_bulk_add_tag(request: Request, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "qualys"):
|
|
return RedirectResponse(url="/qualys/tags")
|
|
form = await request.form()
|
|
ids = [int(x) for x in form.get("asset_ids", "").split(",") if x.strip().isdigit()]
|
|
tid = int(form.get("tag_id", "0") or "0")
|
|
ok = 0; ko = 0
|
|
for aid in ids:
|
|
r = add_tag_to_asset_api(db, aid, tid)
|
|
if r["ok"]: ok += 1
|
|
else: ko += 1
|
|
return RedirectResponse(url=_bulk_return_url(form, f"bulk_add_{ok}_{ko}"), status_code=303)
|
|
|
|
|
|
@router.post("/qualys/bulk/remove-tag")
|
|
async def qualys_bulk_remove_tag(request: Request, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "qualys"):
|
|
return RedirectResponse(url="/qualys/tags")
|
|
form = await request.form()
|
|
ids = [int(x) for x in form.get("asset_ids", "").split(",") if x.strip().isdigit()]
|
|
tid = int(form.get("tag_id", "0") or "0")
|
|
ok = 0; ko = 0
|
|
for aid in ids:
|
|
r = remove_tag_from_asset_api(db, aid, tid)
|
|
if r["ok"]: ok += 1
|
|
else: ko += 1
|
|
return RedirectResponse(url=_bulk_return_url(form, f"bulk_rm_{ok}_{ko}"), status_code=303)
|
|
|
|
|
|
@router.post("/qualys/resync-assets")
|
|
async def qualys_resync_assets(request: Request, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "qualys"):
|
|
return RedirectResponse(url="/qualys/search")
|
|
form = await request.form()
|
|
ids = [int(x) for x in form.get("asset_ids", "").split(",") if x.strip().isdigit()]
|
|
ok = 0
|
|
for aid in ids:
|
|
result = search_assets_api(db, str(aid), field="id", operator="EQUALS")
|
|
if result.get("ok") and result.get("assets"):
|
|
_save_api_results_to_db(db, result["assets"])
|
|
ok += 1
|
|
return RedirectResponse(url=_bulk_return_url(form, f"resync_{ok}"), status_code=303)
|
|
|
|
|
|
@router.get("/qualys/bulk/tags-for-assets")
|
|
async def qualys_tags_for_assets(request: Request, db=Depends(get_db),
|
|
asset_ids: str = Query("")):
|
|
"""Retourne les tags STAT assignés aux assets sélectionnés (JSON)"""
|
|
from fastapi.responses import JSONResponse
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse([])
|
|
ids = [int(x) for x in asset_ids.split(",") if x.strip().isdigit()]
|
|
if not ids:
|
|
return JSONResponse([])
|
|
|
|
placeholders = ",".join([f":id{i}" for i in range(len(ids))])
|
|
params = {f"id{i}": v for i, v in enumerate(ids)}
|
|
|
|
rows = db.execute(text(f"""
|
|
SELECT qt.qualys_tag_id as id, qt.name, COUNT(DISTINCT qat.qualys_asset_id) as count
|
|
FROM qualys_asset_tags qat
|
|
JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id
|
|
WHERE qat.qualys_asset_id IN ({placeholders}) AND qt.is_dynamic = false
|
|
GROUP BY qt.qualys_tag_id, qt.name
|
|
ORDER BY qt.name
|
|
"""), params).fetchall()
|
|
|
|
return JSONResponse([{"id": r.id, "name": r.name, "count": r.count} for r in rows])
|
|
|
|
|
|
@router.get("/qualys/tags/export")
|
|
async def qualys_tags_export(request: Request, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_view(perms, "qualys"):
|
|
return RedirectResponse(url="/qualys/tags")
|
|
tags = db.execute(text("SELECT * FROM qualys_tags ORDER BY name")).fetchall()
|
|
output = io.StringIO()
|
|
writer = csv.writer(output, delimiter=";")
|
|
writer.writerow(["Nom", "ID Qualys", "Type", "Nb assets"])
|
|
for t in tags:
|
|
writer.writerow([t.name, t.qualys_tag_id, "DYN" if t.is_dynamic else "STAT", ""])
|
|
output.seek(0)
|
|
return StreamingResponse(iter(["\ufeff" + output.getvalue()]), media_type="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=qualys_tags.csv"})
|
|
|
|
|
|
# === RECHERCHE ASSETS ===
|
|
|
|
@router.get("/qualys/search", response_class=HTMLResponse)
|
|
async def qualys_search(request: Request, db=Depends(get_db),
|
|
search: str = Query(None), field: str = Query("hostname")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_view(perms, "qualys"):
|
|
return RedirectResponse(url="/dashboard")
|
|
|
|
assets = []
|
|
api_msg = None
|
|
source = None
|
|
force = request.query_params.get("force", "") == "1"
|
|
cache_info = get_cache_stats()
|
|
|
|
if search:
|
|
from datetime import datetime, timedelta
|
|
cutoff = datetime.now() - timedelta(hours=24)
|
|
|
|
if force:
|
|
fresh = 0 # Forcer le resync API
|
|
elif field == "hostname":
|
|
fresh = db.execute(text("""
|
|
SELECT COUNT(*) FROM qualys_assets
|
|
WHERE (hostname ILIKE :q OR name ILIKE :q OR fqdn ILIKE :q)
|
|
AND updated_at > :cutoff
|
|
"""), {"q": f"%{search}%", "cutoff": cutoff}).scalar()
|
|
elif field == "ip":
|
|
fresh = db.execute(text("""
|
|
SELECT COUNT(*) FROM qualys_assets
|
|
WHERE CAST(ip_address AS TEXT) LIKE :q AND updated_at > :cutoff
|
|
"""), {"q": f"{search}%", "cutoff": cutoff}).scalar()
|
|
elif field == "tag":
|
|
fresh = db.execute(text("""
|
|
SELECT COUNT(*) FROM qualys_assets qa
|
|
WHERE qa.qualys_asset_id IN (
|
|
SELECT qat.qualys_asset_id FROM qualys_asset_tags qat
|
|
JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id
|
|
WHERE qt.name ILIKE :q
|
|
) AND qa.updated_at > :cutoff
|
|
"""), {"q": f"%{search}%", "cutoff": cutoff}).scalar()
|
|
else:
|
|
fresh = 0
|
|
|
|
if fresh > 0:
|
|
# Données fraiches en base — pas d'appel API
|
|
source = "base (< 24h)"
|
|
else:
|
|
# Appel API (cache 10min) + stockage en base
|
|
if field == "hostname":
|
|
result = search_assets_api(db, search, field="name", operator="CONTAINS", force_refresh=force)
|
|
elif field == "ip":
|
|
result = search_assets_api(db, search, field="address", operator="CONTAINS", force_refresh=force)
|
|
elif field == "tag":
|
|
result = search_assets_api(db, search, field="tagName", operator="EQUALS", force_refresh=force)
|
|
else:
|
|
result = {"ok": False, "msg": "Champ inconnu", "assets": []}
|
|
|
|
if result.get("ok") and result.get("assets"):
|
|
_save_api_results_to_db(db, result["assets"])
|
|
source = f"API Qualys ({len(result['assets'])} résultats)"
|
|
elif not result.get("ok"):
|
|
source = f"Erreur API: {result.get('msg', '?')}"
|
|
# Fallback sur la base meme si pas frais
|
|
source += " — fallback base"
|
|
else:
|
|
source = "API: aucun résultat"
|
|
|
|
# Lire depuis la base dans tous les cas
|
|
where_db = ["1=1"]; params_db = {}
|
|
if field == "hostname":
|
|
where_db.append("(qa.hostname ILIKE :q OR qa.name ILIKE :q OR qa.fqdn ILIKE :q)")
|
|
params_db["q"] = f"%{search}%"
|
|
elif field == "ip":
|
|
where_db.append("CAST(qa.ip_address AS TEXT) LIKE :q")
|
|
params_db["q"] = f"{search}%"
|
|
elif field == "tag":
|
|
where_db.append("""qa.qualys_asset_id IN (
|
|
SELECT qat.qualys_asset_id FROM qualys_asset_tags qat
|
|
JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id
|
|
WHERE qt.name ILIKE :q)""")
|
|
params_db["q"] = f"%{search}%"
|
|
wc_db = " AND ".join(where_db)
|
|
|
|
assets = db.execute(text(f"""
|
|
SELECT qa.*, s.hostname as srv_hostname, s.tier,
|
|
(SELECT string_agg(qt.name, ', ' ORDER BY qt.name)
|
|
FROM qualys_asset_tags qat JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id
|
|
WHERE qat.qualys_asset_id = qa.qualys_asset_id) as tags_list
|
|
FROM qualys_assets qa
|
|
LEFT JOIN servers s ON qa.server_id = s.id
|
|
WHERE {wc_db} ORDER BY qa.hostname LIMIT 200
|
|
"""), params_db).fetchall()
|
|
|
|
api_msg = f"{len(assets)} résultat(s) — source: {source}"
|
|
|
|
# Enrichir avec vulnérabilités (severity 3,4,5, Confirmed/Potential, Active)
|
|
vuln_map = {}
|
|
if assets:
|
|
ips = [str(a.ip_address) for a in assets if a.ip_address]
|
|
ips = [ip for ip in ips if ip and ip != "None"]
|
|
if ips:
|
|
try:
|
|
vuln_map = get_vuln_counts(db, ",".join(ips[:50]), force_refresh=force)
|
|
except Exception:
|
|
pass
|
|
|
|
all_tags = db.execute(text("SELECT qualys_tag_id, name FROM qualys_tags ORDER BY name")).fetchall()
|
|
|
|
ctx = base_context(request, db, user)
|
|
ctx.update({
|
|
"app_name": APP_NAME, "assets": assets, "search": search,
|
|
"field": field, "api_msg": api_msg,
|
|
"all_tags": all_tags, "vuln_map": vuln_map,
|
|
"cache_info": cache_info,
|
|
"can_edit_qualys": can_edit(perms, "qualys"),
|
|
"msg": request.query_params.get("msg"),
|
|
})
|
|
return templates.TemplateResponse("qualys_search.html", ctx)
|
|
|
|
|
|
@router.get("/qualys/agents", response_class=HTMLResponse)
|
|
def qualys_agents_page(request: Request, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_view(perms, "qualys"):
|
|
return RedirectResponse(url="/dashboard")
|
|
|
|
try:
|
|
keys = get_activation_keys(db)
|
|
except Exception:
|
|
keys = []
|
|
try:
|
|
summary = get_agents_summary(db)
|
|
except Exception:
|
|
summary = {"statuses": [], "versions": [], "total_assets": 0, "active": 0, "inactive": 0}
|
|
|
|
# Serveurs en prod sans agent Qualys
|
|
no_agent_rows = db.execute(text("""
|
|
SELECT s.hostname, s.os_family, s.etat, d.name as domain, e.name as env, z.name as zone
|
|
FROM servers s
|
|
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
|
|
LEFT JOIN domains d ON de.domain_id = d.id
|
|
LEFT JOIN environments e ON de.environment_id = e.id
|
|
LEFT JOIN zones z ON s.zone_id = z.id
|
|
WHERE NOT EXISTS (SELECT 1 FROM qualys_assets qa WHERE qa.hostname = s.hostname)
|
|
ORDER BY s.hostname
|
|
""")).fetchall()
|
|
no_agent = [{"hostname": r.hostname, "os_family": r.os_family, "etat": r.etat,
|
|
"domain": r.domain or "", "env": r.env or "", "zone": r.zone or ""} for r in no_agent_rows]
|
|
|
|
# Agents inactifs
|
|
inactive_rows = db.execute(text("""
|
|
SELECT qa.hostname, qa.os, qa.agent_version, qa.last_checkin, s.etat
|
|
FROM qualys_assets qa
|
|
LEFT JOIN servers s ON qa.server_id = s.id
|
|
WHERE qa.agent_status ILIKE '%inactive%'
|
|
ORDER BY qa.hostname
|
|
""")).fetchall()
|
|
inactive = [{"hostname": r.hostname, "os": r.os, "agent_version": r.agent_version,
|
|
"last_checkin": r.last_checkin, "etat": r.etat or ""} for r in inactive_rows]
|
|
|
|
from ..services.qualys_service import is_refresh_running
|
|
sync_running = is_refresh_running()
|
|
|
|
ctx = base_context(request, db, user)
|
|
ctx.update({
|
|
"app_name": APP_NAME, "keys": keys, "summary": summary,
|
|
"no_agent_servers": no_agent, "inactive_agents": inactive,
|
|
"msg": request.query_params.get("msg", ""),
|
|
"sync_running": sync_running,
|
|
})
|
|
return templates.TemplateResponse("qualys_agents.html", ctx)
|
|
|
|
|
|
@router.post("/qualys/agents/refresh")
|
|
def qualys_agents_refresh(request: Request, db=Depends(get_db), mode: str = "diff"):
|
|
"""Sync Qualys. mode=diff (rapide, depuis dernier sync) ou full (complet)."""
|
|
from fastapi.responses import JSONResponse
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "qualys"):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
from ..services.qualys_service import refresh_all_agents
|
|
try:
|
|
stats = refresh_all_agents(db, mode=mode)
|
|
if stats.get("busy"):
|
|
return JSONResponse(stats, status_code=409)
|
|
if not stats.get("ok"):
|
|
return JSONResponse(stats, status_code=500)
|
|
return JSONResponse({"ok": True, "msg": stats.get("msg") or f"{stats.get('created',0)} créés, {stats.get('updated',0)} mis à jour", "stats": stats})
|
|
except Exception as e:
|
|
import traceback; traceback.print_exc()
|
|
return JSONResponse({"ok": False, "msg": str(e)[:200]}, status_code=500)
|
|
|
|
|
|
@router.post("/qualys/agents/cancel")
|
|
def qualys_agents_cancel(request: Request, db=Depends(get_db)):
|
|
from fastapi.responses import JSONResponse
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "qualys"):
|
|
return JSONResponse({"ok": False}, status_code=403)
|
|
from ..services.qualys_service import cancel_refresh, is_refresh_running
|
|
if not is_refresh_running():
|
|
return JSONResponse({"ok": False, "msg": "Aucun refresh en cours"}, status_code=404)
|
|
return JSONResponse(cancel_refresh())
|
|
|
|
|
|
@router.get("/qualys/agents/export-no-agent")
|
|
async def export_no_agent_csv(request: Request, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_view(perms, "qualys"):
|
|
return RedirectResponse(url="/qualys/agents")
|
|
import io, csv as _csv
|
|
rows = db.execute(text("""
|
|
SELECT s.hostname, s.os_family, s.etat, d.name as domain, e.name as env, z.name as zone
|
|
FROM servers s
|
|
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
|
|
LEFT JOIN domains d ON de.domain_id = d.id
|
|
LEFT JOIN environments e ON de.environment_id = e.id
|
|
LEFT JOIN zones z ON s.zone_id = z.id
|
|
WHERE NOT EXISTS (SELECT 1 FROM qualys_assets qa WHERE qa.hostname = s.hostname)
|
|
ORDER BY s.hostname
|
|
""")).fetchall()
|
|
output = io.StringIO()
|
|
w = _csv.writer(output, delimiter=";")
|
|
w.writerow(["Hostname", "OS", "Domaine", "Environnement", "Zone", "Etat"])
|
|
for r in rows:
|
|
w.writerow([r.hostname, r.os_family or "", r.domain or "", r.env or "", r.zone or "", r.etat or ""])
|
|
output.seek(0)
|
|
return StreamingResponse(
|
|
iter(["\ufeff" + output.getvalue()]),
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=serveurs_sans_agent.csv"})
|
|
|
|
|
|
@router.get("/qualys/agents/export-inactive")
|
|
async def export_inactive_csv(request: Request, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_view(perms, "qualys"):
|
|
return RedirectResponse(url="/qualys/agents")
|
|
import io, csv as _csv
|
|
rows = db.execute(text("""
|
|
SELECT qa.hostname, qa.os, qa.agent_version, qa.last_checkin, s.etat
|
|
FROM qualys_assets qa LEFT JOIN servers s ON qa.server_id = s.id
|
|
WHERE qa.agent_status ILIKE '%inactive%' ORDER BY qa.hostname
|
|
""")).fetchall()
|
|
output = io.StringIO()
|
|
w = _csv.writer(output, delimiter=";")
|
|
w.writerow(["Hostname", "OS", "Version agent", "Dernier check-in", "Etat"])
|
|
for r in rows:
|
|
lc = str(r.last_checkin)[:10] if r.last_checkin else ""
|
|
w.writerow([r.hostname, r.os or "", r.agent_version or "", lc, r.etat or ""])
|
|
output.seek(0)
|
|
return StreamingResponse(
|
|
iter(["\ufeff" + output.getvalue()]),
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=agents_inactifs.csv"})
|
|
|
|
|
|
@router.get("/qualys/vulns/{ip}", response_class=HTMLResponse)
|
|
async def qualys_vulns_detail(request: Request, ip: str, db=Depends(get_db)):
|
|
"""Retourne le detail des vulns severity 3,4,5 pour une IP (fragment HTMX)"""
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return HTMLResponse("<p>Non autorise</p>", status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not can_view(perms, "qualys"):
|
|
return HTMLResponse("<p>Acces interdit</p>", status_code=403)
|
|
|
|
# Cache 10 min
|
|
from ..services import cache as _cache
|
|
cache_key = f"qualys:vulndetail:{ip}"
|
|
cached_html = _cache.get(cache_key)
|
|
if cached_html is not None:
|
|
return HTMLResponse(cached_html + '<p class="text-xs text-gray-600 mt-1">(cache 10 min)</p>')
|
|
|
|
from ..services.qualys_service import _get_qualys_creds, parse_xml
|
|
import requests as _req, urllib3, re as _re
|
|
urllib3.disable_warnings()
|
|
|
|
qualys_url, qualys_user, qualys_pass, qualys_proxy = _get_qualys_creds(db)
|
|
proxies = {"https": qualys_proxy, "http": qualys_proxy} if qualys_proxy else None
|
|
|
|
def parse_cdata(txt, tag):
|
|
import re
|
|
m = re.search(rf'<{tag}>\s*(?:<!\[CDATA\[)?(.*?)(?:\]\]>)?\s*</{tag}>', txt, re.DOTALL)
|
|
return m.group(1).strip() if m else ""
|
|
|
|
# 1. Detections
|
|
try:
|
|
r = _req.post(f"{qualys_url}/api/2.0/fo/asset/host/vm/detection/",
|
|
data={"action": "list", "ips": ip, "severities": "3,4,5",
|
|
"status": "New,Active,Re-Opened", "show_results": "1", "output_format": "XML"},
|
|
auth=(qualys_user, qualys_pass), verify=False, timeout=90, proxies=proxies,
|
|
headers={"X-Requested-With": "Python"})
|
|
except Exception as e:
|
|
return HTMLResponse(f"<p class='text-cyber-red'>Erreur API: {e}</p>")
|
|
|
|
detections = []
|
|
qids = []
|
|
for det in r.text.split("<DETECTION>")[1:]:
|
|
det = det.split("</DETECTION>")[0]
|
|
qid = (parse_xml(det, "QID") or [""])[0]
|
|
sev = (parse_xml(det, "SEVERITY") or [""])[0]
|
|
dtype = (parse_xml(det, "TYPE") or [""])[0]
|
|
status = (parse_xml(det, "STATUS") or [""])[0]
|
|
results = parse_cdata(det, "RESULTS")
|
|
sev_i = int(sev) if sev.isdigit() else 0
|
|
if sev_i < 3 or dtype not in ("Confirmed", "Potential"):
|
|
continue
|
|
detections.append({"qid": qid, "severity": sev_i, "type": dtype, "status": status, "results": results})
|
|
if qid not in qids:
|
|
qids.append(qid)
|
|
|
|
if not detections:
|
|
return HTMLResponse("<p class='text-gray-500 p-4'>Aucune vulnerabilite active (severity 3+)</p>")
|
|
|
|
# 2. KB pour titre, CVE, description
|
|
kb = {}
|
|
if qids:
|
|
try:
|
|
r2 = _req.post(f"{qualys_url}/api/2.0/fo/knowledge_base/vuln/",
|
|
data={"action": "list", "ids": ",".join(qids)},
|
|
auth=(qualys_user, qualys_pass), verify=False, timeout=90, proxies=proxies,
|
|
headers={"X-Requested-With": "Python"})
|
|
for vuln_block in r2.text.split("<VULN>")[1:]:
|
|
vuln_block = vuln_block.split("</VULN>")[0]
|
|
qid = (parse_xml(vuln_block, "QID") or [""])[0]
|
|
title = parse_cdata(vuln_block, "TITLE")
|
|
diagnosis = parse_cdata(vuln_block, "DIAGNOSIS")
|
|
solution = parse_cdata(vuln_block, "SOLUTION")
|
|
consequence = parse_cdata(vuln_block, "CONSEQUENCE")
|
|
# CVEs
|
|
cves = []
|
|
if "<CVE_LIST>" in vuln_block:
|
|
cve_block = vuln_block.split("<CVE_LIST>")[1].split("</CVE_LIST>")[0]
|
|
cve_ids = _re.findall(r'<!\[CDATA\[(CVE-[\d-]+)\]\]>', cve_block)
|
|
if not cve_ids:
|
|
cve_ids = parse_xml(cve_block, "ID")
|
|
cves = cve_ids
|
|
# CVSS
|
|
cvss = ""
|
|
if "<CVSS_V3>" in vuln_block:
|
|
cvss = (parse_xml(vuln_block.split("<CVSS_V3>")[1].split("</CVSS_V3>")[0], "BASE") or [""])[0]
|
|
kb[qid] = {"title": title, "diagnosis": diagnosis, "solution": solution,
|
|
"consequence": consequence, "cves": cves, "cvss3": cvss}
|
|
except Exception:
|
|
pass
|
|
|
|
# 3. Construire HTML
|
|
html = f'<div class="p-4"><h3 class="text-sm font-bold text-cyber-accent mb-3">{len(detections)} vulnerabilite(s) — {ip}</h3>'
|
|
html += '<table class="w-full table-cyber text-xs"><thead><tr>'
|
|
html += '<th class="p-2">Sev</th><th class="p-2">QID</th><th class="text-left p-2">Titre</th>'
|
|
html += '<th class="p-2">Type</th><th class="p-2">CVE</th><th class="p-2">CVSS3</th>'
|
|
html += '</tr></thead><tbody>'
|
|
|
|
for d in sorted(detections, key=lambda x: -x["severity"]):
|
|
qid = d["qid"]
|
|
k = kb.get(qid, {})
|
|
sev_class = "badge-red" if d["severity"] >= 5 else ("badge-yellow" if d["severity"] >= 4 else "badge-gray")
|
|
cve_links = " ".join([f'<a href="https://nvd.nist.gov/vuln/detail/{c}" target="_blank" class="text-cyber-accent hover:underline">{c}</a>' for c in k.get("cves", [])])
|
|
|
|
html += f'<tr>'
|
|
html += f'<td class="p-2 text-center"><span class="badge {sev_class}">{d["severity"]}</span></td>'
|
|
html += f'<td class="p-2 text-center font-mono">{qid}</td>'
|
|
html += f'<td class="p-2 text-cyber-accent">{k.get("title", "N/A")}</td>'
|
|
html += f'<td class="p-2 text-center">{d["type"]}</td>'
|
|
html += f'<td class="p-2">{cve_links or "-"}</td>'
|
|
html += f'<td class="p-2 text-center font-bold">{k.get("cvss3", "-")}</td>'
|
|
html += '</tr>'
|
|
|
|
# Ligne detail
|
|
html += f'<tr class="bg-cyber-hover/30"><td colspan="6" class="p-2 text-xs text-gray-400">'
|
|
|
|
# Parser le results pour en faire un tableau packages
|
|
results = d.get("results", "")
|
|
if results and ("Installed Version" in results or "Required Version" in results):
|
|
# Format: "Package Installed Version Required Version\npkg1 ver1 ver2\npkg2..."
|
|
lines = [l.strip() for l in results.replace("\t", " ").split("\n") if l.strip()]
|
|
# Retirer la ligne header
|
|
pkg_lines = [l for l in lines if not l.startswith("Package") and not l.startswith("Installed")]
|
|
if pkg_lines:
|
|
html += '<table class="w-full mt-1 mb-2" style="background:#0d1117;border-radius:4px;">'
|
|
html += '<thead><tr><th class="p-1 text-left text-gray-500">Package</th>'
|
|
html += '<th class="p-1 text-left text-cyber-red">Version installée</th>'
|
|
html += '<th class="p-1 text-left text-cyber-green">Version requise</th></tr></thead><tbody>'
|
|
for line in pkg_lines:
|
|
parts = line.split()
|
|
if len(parts) >= 3:
|
|
pkg = parts[0]
|
|
installed = parts[1]
|
|
required = " ".join(parts[2:])
|
|
html += f'<tr><td class="p-1 font-mono text-cyber-accent">{pkg}</td>'
|
|
html += f'<td class="p-1 font-mono text-cyber-red">{installed}</td>'
|
|
html += f'<td class="p-1 font-mono text-cyber-green">{required}</td></tr>'
|
|
elif len(parts) >= 1:
|
|
html += f'<tr><td colspan="3" class="p-1 font-mono">{line}</td></tr>'
|
|
html += '</tbody></table>'
|
|
else:
|
|
html += f'<b>Résultat :</b> <span class="font-mono">{results[:300]}</span><br>'
|
|
elif results:
|
|
html += f'<b>Résultat :</b> <span class="font-mono">{results[:300]}</span><br>'
|
|
|
|
# Diagnosis (nettoyé des tags HTML)
|
|
diag = k.get("diagnosis", "")
|
|
if diag:
|
|
diag_clean = _re.sub(r'<[^>]+>', ' ', diag).strip()[:300]
|
|
html += f'<b>Détection :</b> {diag_clean}<br>'
|
|
|
|
if k.get("solution"):
|
|
sol_clean = _re.sub(r'<[^>]+>', ' ', k["solution"]).strip()[:200]
|
|
html += f'<b>Solution :</b> {sol_clean}'
|
|
|
|
html += '</td></tr>'
|
|
|
|
html += '</tbody></table></div>'
|
|
_cache.set(cache_key, html, 600)
|
|
return HTMLResponse(html)
|
|
|
|
|
|
@router.get("/qualys/asset/{asset_id}", response_class=HTMLResponse)
|
|
async def qualys_asset_detail(request: Request, asset_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return HTMLResponse("<p>Non autorisé</p>", status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not can_view(perms, "qualys"):
|
|
return HTMLResponse("<p>Acces interdit</p>", status_code=403)
|
|
|
|
asset = db.execute(text("SELECT * FROM qualys_assets WHERE qualys_asset_id = :aid"),
|
|
{"aid": asset_id}).fetchone()
|
|
if not asset:
|
|
return HTMLResponse("<p>Asset non trouvé</p>")
|
|
|
|
tags = db.execute(text("""
|
|
SELECT qt.name, qt.is_dynamic, qt.qualys_tag_id
|
|
FROM qualys_asset_tags qat JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id
|
|
WHERE qat.qualys_asset_id = :aid ORDER BY qt.name
|
|
"""), {"aid": asset_id}).fetchall()
|
|
|
|
decoded = _decode_hostname(asset.hostname or asset.name or "")
|
|
|
|
all_tags = db.execute(text(
|
|
"SELECT qualys_tag_id, name, is_dynamic FROM qualys_tags ORDER BY name"
|
|
)).fetchall()
|
|
|
|
return templates.TemplateResponse("partials/qualys_asset_detail.html", {
|
|
"request": request, "a": asset, "tags": tags, "decoded": decoded, "all_tags": all_tags,
|
|
})
|
|
|
|
|
|
@router.get("/qualys/search/export")
|
|
async def qualys_search_export(request: Request, db=Depends(get_db),
|
|
search: str = Query(""), field: str = Query("hostname")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
# Re-execute la recherche
|
|
where = ["1=1"]; params = {}
|
|
if search:
|
|
if field == "hostname":
|
|
where.append("(qa.hostname ILIKE :q OR qa.name ILIKE :q OR qa.fqdn ILIKE :q)")
|
|
params["q"] = f"%{search}%"
|
|
elif field == "ip":
|
|
where.append("CAST(qa.ip_address AS TEXT) LIKE :q"); params["q"] = f"{search}%"
|
|
wc = " AND ".join(where)
|
|
assets = db.execute(text(f"""
|
|
SELECT qa.hostname, qa.ip_address, qa.fqdn, qa.os, qa.agent_status,
|
|
qa.agent_version, qa.last_checkin, qa.qualys_asset_id
|
|
FROM qualys_assets qa WHERE {wc} ORDER BY qa.hostname LIMIT 1000
|
|
"""), params).fetchall()
|
|
output = io.StringIO()
|
|
writer = csv.writer(output, delimiter=";")
|
|
writer.writerow(["Hostname", "IP", "FQDN", "OS", "Agent", "Version", "Dernier check-in", "Qualys ID"])
|
|
for a in assets:
|
|
writer.writerow([a.hostname, a.ip_address, a.fqdn, a.os, a.agent_status,
|
|
a.agent_version, a.last_checkin, a.qualys_asset_id])
|
|
output.seek(0)
|
|
return StreamingResponse(iter(["\ufeff" + output.getvalue()]), media_type="text/csv",
|
|
headers={"Content-Disposition": f"attachment; filename=qualys_assets_{search or 'all'}.csv"})
|
|
|
|
|
|
# === DECODEUR ===
|
|
|
|
@router.get("/qualys/decoder", response_class=HTMLResponse)
|
|
async def qualys_decoder(request: Request, db=Depends(get_db),
|
|
hostname: str = Query(None)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_view(perms, "qualys"):
|
|
return RedirectResponse(url="/dashboard")
|
|
|
|
result = None
|
|
current_tags = []
|
|
if hostname:
|
|
result = _decode_hostname(hostname)
|
|
result["hostname"] = hostname
|
|
|
|
# Chercher les tags actuels dans Qualys
|
|
asset = db.execute(text(
|
|
"SELECT qualys_asset_id FROM qualys_assets WHERE LOWER(hostname) = LOWER(:h)"
|
|
), {"h": hostname.strip().split(".")[0].lower()}).fetchone()
|
|
if asset:
|
|
rows = db.execute(text("""
|
|
SELECT qt.name, qt.is_dynamic FROM qualys_asset_tags qat
|
|
JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id
|
|
WHERE qat.qualys_asset_id = :aid ORDER BY qt.name
|
|
"""), {"aid": asset.qualys_asset_id}).fetchall()
|
|
current_tags = [(r.name, r.is_dynamic) for r in rows]
|
|
|
|
ctx = base_context(request, db, user)
|
|
ctx.update({
|
|
"app_name": APP_NAME, "result": result, "hostname": hostname,
|
|
"current_tags": current_tags, "auto_prefixes": AUTO_PREFIXES,
|
|
})
|
|
return templates.TemplateResponse("qualys_decoder.html", ctx)
|
|
|
|
|
|
# ═══════════════════════════════════════════════
|
|
# DEPLOIEMENT AGENT QUALYS
|
|
# ═══════════════════════════════════════════════
|
|
|
|
@router.get("/qualys/deploy", response_class=HTMLResponse)
|
|
async def qualys_deploy_page(request: Request, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "qualys"):
|
|
return RedirectResponse(url="/dashboard")
|
|
|
|
from ..services.agent_deploy_service import list_packages
|
|
from ..services.secrets_service import get_secret
|
|
|
|
packages = list_packages()
|
|
servers = db.execute(text("""
|
|
SELECT s.id, s.hostname, s.os_family, s.os_version, s.etat, s.ssh_user, s.ssh_port, s.ssh_method,
|
|
d.name as domain, e.name as env,
|
|
qa.agent_version, qa.agent_status, qa.last_checkin
|
|
FROM servers s
|
|
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
|
|
LEFT JOIN domains d ON de.domain_id = d.id
|
|
LEFT JOIN environments e ON de.environment_id = e.id
|
|
LEFT JOIN qualys_assets qa ON qa.server_id = s.id
|
|
ORDER BY s.hostname
|
|
""")).fetchall()
|
|
servers = [dict(r._mapping) for r in servers]
|
|
for s in servers:
|
|
if s.get("last_checkin"):
|
|
s["last_checkin"] = str(s["last_checkin"])[:19]
|
|
|
|
ctx = base_context(request, db, user)
|
|
ctx.update({
|
|
"app_name": APP_NAME,
|
|
"packages": packages,
|
|
"servers": servers,
|
|
"activation_id": get_secret(db, "qualys_activation_id") or "081a9a12-ca97-4de1-828c-c6ad918ce77e",
|
|
"customer_id": get_secret(db, "qualys_customer_id") or "a2e3271b-c2a1-ec6b-8324-8f51948783d4",
|
|
"server_uri": get_secret(db, "qualys_server_uri") or "https://qagpublic.qg2.apps.qualys.eu/CloudAgent/",
|
|
"msg": request.query_params.get("msg", ""),
|
|
})
|
|
return templates.TemplateResponse("qualys_deploy.html", ctx)
|
|
|
|
|
|
@router.post("/qualys/deploy/run")
|
|
async def qualys_deploy_run(request: Request, db=Depends(get_db)):
|
|
from fastapi.responses import JSONResponse
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not can_edit(perms, "qualys"):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
|
|
from ..services.agent_deploy_service import start_deploy_job
|
|
from ..services.secrets_service import get_secret
|
|
|
|
body = await request.json()
|
|
server_ids = body.get("server_ids", "")
|
|
activation_id = body.get("activation_id", "")
|
|
customer_id = body.get("customer_id", "")
|
|
server_uri = body.get("server_uri", "")
|
|
package_deb = body.get("package_deb", "")
|
|
package_rpm = body.get("package_rpm", "")
|
|
force_downgrade = body.get("force_downgrade", False)
|
|
|
|
ids = [int(x) for x in str(server_ids).split(",") if x.strip().isdigit()]
|
|
if not ids:
|
|
return JSONResponse({"ok": False, "msg": "Aucun serveur sélectionné"})
|
|
|
|
ssh_key = get_secret(db, "ssh_key_file") or "/opt/patchcenter/keys/id_ed25519"
|
|
placeholders = ",".join(str(i) for i in ids)
|
|
rows = db.execute(text(f"""
|
|
SELECT id, hostname, os_family, os_version, ssh_user, ssh_port, ssh_method
|
|
FROM servers WHERE id IN ({placeholders})
|
|
""")).fetchall()
|
|
servers = [{"hostname": r.hostname, "os_family": r.os_family,
|
|
"os_version": r.os_version, "ssh_user": r.ssh_user, "ssh_port": r.ssh_port} for r in rows]
|
|
|
|
job_id = start_deploy_job(servers, ssh_key, package_deb, package_rpm,
|
|
activation_id, customer_id, server_uri, force_downgrade=force_downgrade)
|
|
|
|
from ..services.audit_service import log_action
|
|
log_action(db, request, user, "qualys_deploy",
|
|
entity_type="deploy_job",
|
|
details={"job_id": job_id, "servers": len(servers)})
|
|
db.commit()
|
|
|
|
return JSONResponse({"ok": True, "job_id": job_id, "total": len(servers)})
|
|
|
|
|
|
@router.get("/qualys/deploy/status/{job_id}")
|
|
async def qualys_deploy_status(request: Request, job_id: str, db=Depends(get_db)):
|
|
from fastapi.responses import JSONResponse
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
|
|
|
|
from ..services.agent_deploy_service import get_deploy_job
|
|
job = get_deploy_job(job_id)
|
|
if not job:
|
|
return JSONResponse({"ok": False, "msg": "Job introuvable"}, status_code=404)
|
|
|
|
import time
|
|
elapsed = int(time.time() - job["started_at"])
|
|
return JSONResponse({
|
|
"ok": True,
|
|
"job_id": job_id,
|
|
"total": job["total"],
|
|
"done": job["done"],
|
|
"finished": job["finished"],
|
|
"elapsed": elapsed,
|
|
"servers": job["servers"],
|
|
"log": job["log"][-50:], # dernières 50 lignes
|
|
})
|
|
|
|
|
|
@router.post("/qualys/deploy/check")
|
|
async def qualys_deploy_check(request: Request, db=Depends(get_db)):
|
|
from fastapi.responses import JSONResponse
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not can_view(perms, "qualys"):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
|
|
from ..services.agent_deploy_service import check_agent
|
|
from ..services.secrets_service import get_secret
|
|
|
|
body = await request.json()
|
|
server_ids = body.get("server_ids", "")
|
|
ids = [int(x) for x in str(server_ids).split(",") if x.strip().isdigit()]
|
|
if not ids:
|
|
return JSONResponse({"ok": False, "msg": "Aucun serveur sélectionné"})
|
|
|
|
ssh_key = get_secret(db, "ssh_key_file") or "/opt/patchcenter/keys/id_ed25519"
|
|
placeholders = ",".join(str(i) for i in ids)
|
|
servers = db.execute(text(f"SELECT id, hostname, ssh_user, ssh_port FROM servers WHERE id IN ({placeholders})")).fetchall()
|
|
|
|
results = []
|
|
for srv in servers:
|
|
r = check_agent(srv.hostname, srv.ssh_user or "root", ssh_key, srv.ssh_port or 22)
|
|
results.append(r)
|
|
|
|
return JSONResponse({"ok": True, "results": results, "total": len(results),
|
|
"active": sum(1 for r in results if r["status"] == "ACTIVE"),
|
|
"not_installed": sum(1 for r in results if r["status"] == "NOT_INSTALLED"),
|
|
"failed": sum(1 for r in results if r["status"] == "CONNECTION_FAILED")})
|