patchcenter/app/routers/qualys.py
Khalid MOUTAOUAKIL 8e62b1fb11 Qualys complet, contacts, audit refactoré, bulk serveurs
Qualys:
- Recherche API temps réel + cache 24h base locale
- Tags: liste DYN/STAT, mapping V3 (DOM-*, TYP-*, APP-*), nb assets cliquable
- CRUD tags: créer STAT, supprimer, resync API
- Détail asset: infos + décodage nomenclature V3 + tags assignés
- Ajout/retrait tag unitaire avec autocomplete filtrable
- Bulk add/remove tag en masse avec dropdown filtrable
- Tags retirer: charge dynamiquement les STAT assignés aux assets sélectionnés
- Resync assets sélectionnés + retour même recherche

Contacts:
- 50 contacts importés avec 93 scopes (domaine/app/serveur/zone par env)
- 13 rôles (responsable_domaine, ra_prod, ra_recette, referent_technique...)
- Recherche par nom/email/serveur (affiche contacts liés)
- CRUD complet: éditer, scopes, activer/désactiver, supprimer
- Serveurs liés calculés dynamiquement depuis les scopes

Audit:
- Restructuré: Audit général + sous-menu Spécifique
- Dernier audit global affiché avec date
- Lancer audit général avec exclusions (domaines/zones) et parallélisme
- KPIs Qualys KO et S1 KO cliquables
- Export CSV

Serveurs:
- Actions groupées bulk (domaine, env, tier, état, owner, licence)
- Dashboard: KPI EOL ajouté
- Filtre état: EOL + en décommissionnement ajoutés
- 138 serveurs EOL importés depuis Qualys (owner=na, hors périmètre)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 00:47:26 +02:00

519 lines
22 KiB
Python

"""Router Qualys — Tags, Recherche assets, Décodeur nomenclature"""
from fastapi import APIRouter, Request, Depends, Query, Form
from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
import csv, io, re
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, can_admin, base_context
from ..services.qualys_service import (
sync_server_qualys, search_assets_api, get_all_tags_api,
create_tag_api, delete_tag_api, add_tag_to_asset_api,
remove_tag_from_asset_api, resync_all_tags,
)
from ..config import APP_NAME
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
# Nomenclature maps (from server_decoder.py)
ENV_MAP = {'p': ('Production', 'ENV-PRD'), 'r': ('Recette', 'ENV-REC'), 'i': ('Pré-Prod', 'ENV-PPR'),
'v': ('Test', 'ENV-TST'), 'd': ('Développement', 'ENV-DEV'), 't': ('Test', 'ENV-TST'),
's': ('Production', 'ENV-PRD'), 'o': ('Pré-Prod', 'ENV-PPR')}
DOMAIN_MAP = {
'bot': ('Flux Libre', 'DOM-FL'), 'boo': ('Flux Libre', 'DOM-FL'), 'boc': ('Flux Libre', 'DOM-FL'),
'afl': ('Flux Libre', 'DOM-FL'), 'sup': ('Flux Libre', 'DOM-FL'),
'dsi': ('Infrastructure', 'DOM-INF'), 'cyb': ('Infrastructure', 'DOM-INF'),
'vsa': ('Infrastructure', 'DOM-INF'), 'iad': ('Infrastructure', 'DOM-INF'),
'bur': ('Infrastructure', 'DOM-INF'), 'aii': ('Infrastructure', 'DOM-INF'),
'ecm': ('Infrastructure', 'DOM-INF'), 'log': ('Infrastructure', 'DOM-INF'),
'bck': ('Infrastructure', 'DOM-INF'), 'gaw': ('Infrastructure', 'DOM-INF'),
'vid': ('Infrastructure', 'DOM-INF'), 'sim': ('Infrastructure', 'DOM-INF'),
'emv': ('EMV', 'TAG-EMV'), 'pci': ('EMV', 'TAG-EMV'),
'pea': ('Péage', 'DOM-PEA'), 'osa': ('Péage', 'DOM-PEA'), 'svp': ('Péage', 'DOM-PEA'),
'adv': ('Péage', 'DOM-PEA'), 'rpa': ('Péage', 'DOM-PEA'),
'ame': ('Trafic', 'DOM-TRA'), 'tra': ('Trafic', 'DOM-TRA'), 'dai': ('Trafic', 'DOM-TRA'),
'pat': ('Trafic', 'DOM-TRA'), 'dep': ('Trafic', 'DOM-TRA'), 'exp': ('Trafic', 'DOM-TRA'),
'sig': ('Trafic', 'DOM-TRA'), 'rau': ('Trafic', 'DOM-TRA'),
'dec': ('BI', 'DOM-BI'), 'sas': ('BI', 'DOM-BI'), 'bip': ('BI', 'DOM-BI'),
'int': ('Gestion', 'DOM-GES'), 'agt': ('Gestion', 'DOM-GES'), 'pin': ('Gestion', 'DOM-GES'),
'ech': ('Gestion', 'DOM-GES'),
}
AUTO_PREFIXES = ('ENV-', 'OS-', 'DOM-', 'TYP-', 'TAG-OBS', 'TAG-EMV')
def _save_api_results_to_db(db, assets):
"""Sauvegarde les résultats de recherche API dans la base locale"""
for a in assets:
if not a.get("qualys_asset_id"):
continue
hostname = (a.get("hostname") or a.get("name", "")).split(".")[0].lower()
os_val = a.get("os", "")
os_family = "linux" if any(k in os_val.lower() for k in ("linux", "red hat", "centos")) else "windows" if "windows" in os_val.lower() else None
srv = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname) = LOWER(:h)"),
{"h": hostname}).fetchone()
server_id = srv.id if srv else None
db.execute(text("""
INSERT INTO qualys_assets (qualys_asset_id, name, hostname, fqdn, ip_address, os, os_family,
agent_status, agent_version, last_checkin, server_id)
VALUES (:qid, :name, :hn, :fqdn, :ip, :os, :osf, :ast, :av, :lc, :sid)
ON CONFLICT (qualys_asset_id) DO UPDATE SET
name=EXCLUDED.name, fqdn=EXCLUDED.fqdn, ip_address=EXCLUDED.ip_address,
os=EXCLUDED.os, os_family=EXCLUDED.os_family,
agent_status=EXCLUDED.agent_status, agent_version=EXCLUDED.agent_version,
last_checkin=EXCLUDED.last_checkin, server_id=COALESCE(EXCLUDED.server_id, qualys_assets.server_id),
updated_at=now()
"""), {
"qid": a["qualys_asset_id"], "name": a.get("name"), "hn": hostname,
"fqdn": a.get("fqdn") or None, "ip": a.get("ip_address") or None,
"os": os_val, "osf": os_family,
"ast": a.get("agent_status", ""), "av": a.get("agent_version", ""),
"lc": a.get("last_checkin") or None, "sid": server_id,
})
if a.get("tags"):
db.execute(text("DELETE FROM qualys_asset_tags WHERE qualys_asset_id = :qid"),
{"qid": a["qualys_asset_id"]})
for tag_name in a["tags"]:
tag_row = db.execute(text("SELECT qualys_tag_id FROM qualys_tags WHERE name = :n"),
{"n": tag_name}).fetchone()
if tag_row:
db.execute(text("""
INSERT INTO qualys_asset_tags (qualys_asset_id, qualys_tag_id)
VALUES (:qid, :tid) ON CONFLICT DO NOTHING
"""), {"qid": a["qualys_asset_id"], "tid": tag_row.qualys_tag_id})
db.commit()
def _decode_hostname(hostname):
"""Décode un hostname SANEF et retourne les tags suggérés"""
hn = hostname.lower().strip()
if hn.startswith('ls-'):
return {'type': 'Physique', 'env': 'Production', 'domain': 'Péage', 'tags': ['ENV-PRD', 'DOM-PEA', 'TYP-SRV', 'OS-WIN']}
if len(hn) < 4:
return {'type': '?', 'env': '?', 'domain': '?', 'tags': []}
# Type
machine = 'VM' if hn[0] == 'v' else 'Physique'
eqt = 'TYP-VIR' if hn[0] == 'v' else 'TYP-SRV'
# Env
env_info = ENV_MAP.get(hn[1], ('?', None))
env_name, env_tag = env_info
# Domain (try 4, 3, 2 char prefixes)
rest = hn[2:]
domain_name, domain_tag = '?', None
for length in (4, 3, 2):
prefix = rest[:length]
if prefix in DOMAIN_MAP:
domain_name, domain_tag = DOMAIN_MAP[prefix]
break
tags = [t for t in [env_tag, domain_tag, eqt] if t]
return {'type': machine, 'env': env_name, 'domain': domain_name, 'tags': tags}
# === TAGS ===
@router.get("/qualys/tags", response_class=HTMLResponse)
async def qualys_tags(request: Request, db=Depends(get_db),
search: str = Query(None), tag_type: str = Query(None)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "qualys"):
return RedirectResponse(url="/dashboard")
where = ["1=1"]
params = {}
if search:
where.append("qt.name ILIKE :q"); params["q"] = f"%{search}%"
if tag_type == "dyn":
where.append("qt.is_dynamic = true")
elif tag_type == "stat":
where.append("qt.is_dynamic = false")
wc = " AND ".join(where)
tags = db.execute(text(f"""
SELECT qt.*,
(SELECT COUNT(*) FROM qualys_asset_tags qat WHERE qat.qualys_tag_id = qt.qualys_tag_id) as asset_count
FROM qualys_tags qt WHERE {wc} ORDER BY qt.name
"""), params).fetchall()
stats = db.execute(text("""
SELECT COUNT(*) as total,
COUNT(*) FILTER (WHERE is_dynamic) as dyn,
COUNT(*) FILTER (WHERE NOT is_dynamic) as stat
FROM qualys_tags
""")).fetchone()
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME, "tags": tags, "stats": stats,
"search": search, "tag_type": tag_type,
"can_edit_qualys": can_edit(perms, "qualys"),
"msg": request.query_params.get("msg"),
})
return templates.TemplateResponse("qualys_tags.html", ctx)
@router.post("/qualys/tags/resync")
async def qualys_tags_resync(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
result = resync_all_tags(db)
msg = "resync_ok" if result["ok"] else "resync_ko"
return RedirectResponse(url=f"/qualys/tags?msg={msg}", status_code=303)
@router.post("/qualys/tags/create")
async def qualys_tag_create(request: Request, db=Depends(get_db),
tag_name: str = Form(...)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
result = create_tag_api(db, tag_name.strip())
msg = "created" if result["ok"] else "create_error"
return RedirectResponse(url=f"/qualys/tags?msg={msg}", status_code=303)
@router.post("/qualys/tags/{tag_id}/delete")
async def qualys_tag_delete(request: Request, tag_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
result = delete_tag_api(db, tag_id)
msg = "deleted" if result["ok"] else "delete_error"
return RedirectResponse(url=f"/qualys/tags?msg={msg}", status_code=303)
@router.post("/qualys/asset/{asset_id}/tag/add")
async def qualys_asset_tag_add(request: Request, asset_id: int, db=Depends(get_db),
tag_id: str = Form(...)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
result = add_tag_to_asset_api(db, asset_id, int(tag_id))
color = "text-cyber-green" if result["ok"] else "text-cyber-red"
return HTMLResponse(f'<span class="text-xs {color}">{result["msg"]}</span>')
@router.post("/qualys/asset/{asset_id}/tag/remove")
async def qualys_asset_tag_remove(request: Request, asset_id: int, db=Depends(get_db),
tag_id: str = Form(...)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
result = remove_tag_from_asset_api(db, asset_id, int(tag_id))
color = "text-cyber-green" if result["ok"] else "text-cyber-red"
return HTMLResponse(f'<span class="text-xs {color}">{result["msg"]}</span>')
def _bulk_return_url(form, msg):
"""Construit l'URL de retour avec la recherche conservée"""
s = form.get("return_search", "")
f = form.get("return_field", "hostname")
return f"/qualys/search?field={f}&search={s}&msg={msg}"
@router.post("/qualys/bulk/add-tag")
async def qualys_bulk_add_tag(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
form = await request.form()
ids = [int(x) for x in form.get("asset_ids", "").split(",") if x.strip().isdigit()]
tid = int(form.get("tag_id", "0") or "0")
ok = 0; ko = 0
for aid in ids:
r = add_tag_to_asset_api(db, aid, tid)
if r["ok"]: ok += 1
else: ko += 1
return RedirectResponse(url=_bulk_return_url(form, f"bulk_add_{ok}_{ko}"), status_code=303)
@router.post("/qualys/bulk/remove-tag")
async def qualys_bulk_remove_tag(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
form = await request.form()
ids = [int(x) for x in form.get("asset_ids", "").split(",") if x.strip().isdigit()]
tid = int(form.get("tag_id", "0") or "0")
ok = 0; ko = 0
for aid in ids:
r = remove_tag_from_asset_api(db, aid, tid)
if r["ok"]: ok += 1
else: ko += 1
return RedirectResponse(url=_bulk_return_url(form, f"bulk_rm_{ok}_{ko}"), status_code=303)
@router.post("/qualys/resync-assets")
async def qualys_resync_assets(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
form = await request.form()
ids = [int(x) for x in form.get("asset_ids", "").split(",") if x.strip().isdigit()]
ok = 0
for aid in ids:
result = search_assets_api(db, str(aid), field="id", operator="EQUALS")
if result.get("ok") and result.get("assets"):
_save_api_results_to_db(db, result["assets"])
ok += 1
return RedirectResponse(url=_bulk_return_url(form, f"resync_{ok}"), status_code=303)
@router.get("/qualys/bulk/tags-for-assets")
async def qualys_tags_for_assets(request: Request, db=Depends(get_db),
asset_ids: str = Query("")):
"""Retourne les tags STAT assignés aux assets sélectionnés (JSON)"""
from fastapi.responses import JSONResponse
user = get_current_user(request)
if not user:
return JSONResponse([])
ids = [int(x) for x in asset_ids.split(",") if x.strip().isdigit()]
if not ids:
return JSONResponse([])
placeholders = ",".join([f":id{i}" for i in range(len(ids))])
params = {f"id{i}": v for i, v in enumerate(ids)}
rows = db.execute(text(f"""
SELECT qt.qualys_tag_id as id, qt.name, COUNT(DISTINCT qat.qualys_asset_id) as count
FROM qualys_asset_tags qat
JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id
WHERE qat.qualys_asset_id IN ({placeholders}) AND qt.is_dynamic = false
GROUP BY qt.qualys_tag_id, qt.name
ORDER BY qt.name
"""), params).fetchall()
return JSONResponse([{"id": r.id, "name": r.name, "count": r.count} for r in rows])
@router.get("/qualys/tags/export")
async def qualys_tags_export(request: Request, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
tags = db.execute(text("SELECT * FROM qualys_tags ORDER BY name")).fetchall()
output = io.StringIO()
writer = csv.writer(output, delimiter=";")
writer.writerow(["Nom", "ID Qualys", "Type", "Nb assets"])
for t in tags:
writer.writerow([t.name, t.qualys_tag_id, "DYN" if t.is_dynamic else "STAT", ""])
output.seek(0)
return StreamingResponse(iter(["\ufeff" + output.getvalue()]), media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=qualys_tags.csv"})
# === RECHERCHE ASSETS ===
@router.get("/qualys/search", response_class=HTMLResponse)
async def qualys_search(request: Request, db=Depends(get_db),
search: str = Query(None), field: str = Query("hostname")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "qualys"):
return RedirectResponse(url="/dashboard")
assets = []
api_msg = None
source = None
if search:
# Vérifier si on a des données récentes (< 24h) en base
from datetime import datetime, timedelta
cutoff = datetime.now() - timedelta(hours=24)
if field == "hostname":
fresh = db.execute(text("""
SELECT COUNT(*) FROM qualys_assets
WHERE (hostname ILIKE :q OR name ILIKE :q OR fqdn ILIKE :q)
AND updated_at > :cutoff
"""), {"q": f"%{search}%", "cutoff": cutoff}).scalar()
elif field == "ip":
fresh = db.execute(text("""
SELECT COUNT(*) FROM qualys_assets
WHERE CAST(ip_address AS TEXT) LIKE :q AND updated_at > :cutoff
"""), {"q": f"{search}%", "cutoff": cutoff}).scalar()
elif field == "tag":
fresh = db.execute(text("""
SELECT COUNT(*) FROM qualys_assets qa
WHERE qa.qualys_asset_id IN (
SELECT qat.qualys_asset_id FROM qualys_asset_tags qat
JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id
WHERE qt.name ILIKE :q
) AND qa.updated_at > :cutoff
"""), {"q": f"%{search}%", "cutoff": cutoff}).scalar()
else:
fresh = 0
if fresh > 0:
# Données fraiches en base — pas d'appel API
source = "base (< 24h)"
else:
# Appel API + stockage en base
if field == "hostname":
result = search_assets_api(db, search, field="name", operator="CONTAINS")
elif field == "ip":
result = search_assets_api(db, search, field="address", operator="CONTAINS")
elif field == "tag":
result = search_assets_api(db, search, field="tagName", operator="EQUALS")
else:
result = {"ok": False, "msg": "Champ inconnu", "assets": []}
if result.get("ok") and result.get("assets"):
_save_api_results_to_db(db, result["assets"])
source = f"API Qualys ({len(result['assets'])} résultats)"
elif not result.get("ok"):
source = f"Erreur API: {result.get('msg', '?')}"
# Fallback sur la base meme si pas frais
source += " — fallback base"
else:
source = "API: aucun résultat"
# Lire depuis la base dans tous les cas
where_db = ["1=1"]; params_db = {}
if field == "hostname":
where_db.append("(qa.hostname ILIKE :q OR qa.name ILIKE :q OR qa.fqdn ILIKE :q)")
params_db["q"] = f"%{search}%"
elif field == "ip":
where_db.append("CAST(qa.ip_address AS TEXT) LIKE :q")
params_db["q"] = f"{search}%"
elif field == "tag":
where_db.append("""qa.qualys_asset_id IN (
SELECT qat.qualys_asset_id FROM qualys_asset_tags qat
JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id
WHERE qt.name ILIKE :q)""")
params_db["q"] = f"%{search}%"
wc_db = " AND ".join(where_db)
assets = db.execute(text(f"""
SELECT qa.*, s.hostname as srv_hostname, s.tier,
(SELECT string_agg(qt.name, ', ' ORDER BY qt.name)
FROM qualys_asset_tags qat JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id
WHERE qat.qualys_asset_id = qa.qualys_asset_id) as tags_list
FROM qualys_assets qa
LEFT JOIN servers s ON qa.server_id = s.id
WHERE {wc_db} ORDER BY qa.hostname LIMIT 200
"""), params_db).fetchall()
api_msg = f"{len(assets)} résultat(s) — source: {source}"
all_tags = db.execute(text("SELECT qualys_tag_id, name FROM qualys_tags ORDER BY name")).fetchall()
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME, "assets": assets, "search": search,
"field": field, "api_msg": api_msg,
"all_tags": all_tags,
"can_edit_qualys": can_edit(perms, "qualys"),
"msg": request.query_params.get("msg"),
})
return templates.TemplateResponse("qualys_search.html", ctx)
@router.get("/qualys/asset/{asset_id}", response_class=HTMLResponse)
async def qualys_asset_detail(request: Request, asset_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return HTMLResponse("<p>Non autorisé</p>")
asset = db.execute(text("SELECT * FROM qualys_assets WHERE qualys_asset_id = :aid"),
{"aid": asset_id}).fetchone()
if not asset:
return HTMLResponse("<p>Asset non trouvé</p>")
tags = db.execute(text("""
SELECT qt.name, qt.is_dynamic, qt.qualys_tag_id
FROM qualys_asset_tags qat JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id
WHERE qat.qualys_asset_id = :aid ORDER BY qt.name
"""), {"aid": asset_id}).fetchall()
decoded = _decode_hostname(asset.hostname or asset.name or "")
all_tags = db.execute(text(
"SELECT qualys_tag_id, name, is_dynamic FROM qualys_tags ORDER BY name"
)).fetchall()
return templates.TemplateResponse("partials/qualys_asset_detail.html", {
"request": request, "a": asset, "tags": tags, "decoded": decoded, "all_tags": all_tags,
})
@router.get("/qualys/search/export")
async def qualys_search_export(request: Request, db=Depends(get_db),
search: str = Query(""), field: str = Query("hostname")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
# Re-execute la recherche
where = ["1=1"]; params = {}
if search:
if field == "hostname":
where.append("(qa.hostname ILIKE :q OR qa.name ILIKE :q OR qa.fqdn ILIKE :q)")
params["q"] = f"%{search}%"
elif field == "ip":
where.append("CAST(qa.ip_address AS TEXT) LIKE :q"); params["q"] = f"{search}%"
wc = " AND ".join(where)
assets = db.execute(text(f"""
SELECT qa.hostname, qa.ip_address, qa.fqdn, qa.os, qa.agent_status,
qa.agent_version, qa.last_checkin, qa.qualys_asset_id
FROM qualys_assets qa WHERE {wc} ORDER BY qa.hostname LIMIT 1000
"""), params).fetchall()
output = io.StringIO()
writer = csv.writer(output, delimiter=";")
writer.writerow(["Hostname", "IP", "FQDN", "OS", "Agent", "Version", "Dernier check-in", "Qualys ID"])
for a in assets:
writer.writerow([a.hostname, a.ip_address, a.fqdn, a.os, a.agent_status,
a.agent_version, a.last_checkin, a.qualys_asset_id])
output.seek(0)
return StreamingResponse(iter(["\ufeff" + output.getvalue()]), media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=qualys_assets_{search or 'all'}.csv"})
# === DECODEUR ===
@router.get("/qualys/decoder", response_class=HTMLResponse)
async def qualys_decoder(request: Request, db=Depends(get_db),
hostname: str = Query(None)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "qualys"):
return RedirectResponse(url="/dashboard")
result = None
current_tags = []
if hostname:
result = _decode_hostname(hostname)
result["hostname"] = hostname
# Chercher les tags actuels dans Qualys
asset = db.execute(text(
"SELECT qualys_asset_id FROM qualys_assets WHERE LOWER(hostname) = LOWER(:h)"
), {"h": hostname.strip().split(".")[0].lower()}).fetchone()
if asset:
rows = db.execute(text("""
SELECT qt.name, qt.is_dynamic FROM qualys_asset_tags qat
JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id
WHERE qat.qualys_asset_id = :aid ORDER BY qt.name
"""), {"aid": asset.qualys_asset_id}).fetchall()
current_tags = [(r.name, r.is_dynamic) for r in rows]
ctx = base_context(request, db, user)
ctx.update({
"app_name": APP_NAME, "result": result, "hostname": hostname,
"current_tags": current_tags, "auto_prefixes": AUTO_PREFIXES,
})
return templates.TemplateResponse("qualys_decoder.html", ctx)