patchcenter/app/routers/servers.py
Admin MPCZ 0be4849ef2 Fix filtres zone/licence perdus lors tri/pagination/export CSV
Les macros sort_url et qs + le lien Export CSV ne propageaient pas les
parametres zone/licence ajoutes recemment. Ajout dans:
- servers.html (macros + export link)
- servers.py (endpoint export-csv: signature + filters dict)
2026-04-14 22:25:57 +02:00

286 lines
14 KiB
Python

"""Router serveurs — CRUD + detail + edit via HTMX"""
from fastapi import APIRouter, Request, Depends, Query, Form
from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
from ..dependencies import get_db, get_current_user
from ..services.server_service import (
get_server_full, get_server_tags, get_server_ips,
list_servers, update_server, get_reference_data
)
from ..services.qualys_service import sync_server_qualys
from ..config import APP_NAME
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
@router.get("/servers", response_class=HTMLResponse)
async def servers_list(request: Request, db=Depends(get_db),
domain: str = Query(None), env: str = Query(None),
tier: str = Query(None), etat: str = Query(None),
os: str = Query(None), owner: str = Query(None),
zone: str = Query(None), licence: str = Query(None),
application: str = Query(None), application_id: int = Query(None),
search: str = Query(None), page: int = Query(1),
sort: str = Query("hostname"), sort_dir: str = Query("asc")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
filters = {"domain": domain, "env": env, "tier": tier, "etat": etat, "os": os,
"owner": owner, "zone": zone, "licence": licence,
"application": application, "application_id": application_id,
"search": search}
servers, total = list_servers(db, filters, page, sort=sort, sort_dir=sort_dir)
domains_list, envs_list = get_reference_data(db)
zones_list = db.execute(text("SELECT name, is_dmz FROM zones ORDER BY is_dmz DESC, name")).fetchall()
applications_list = db.execute(text("""SELECT application_name, COUNT(*) as c FROM servers
WHERE application_name IS NOT NULL AND application_name != ''
GROUP BY application_name ORDER BY application_name""")).fetchall()
from ..dependencies import get_user_perms, can_edit
perms = get_user_perms(db, user)
can_edit_servers = can_edit(perms, "servers")
# Correspondances en bulk pour la page en cours
from ..services.correspondance_service import get_links_bulk
links = get_links_bulk(db, [s.id for s in servers])
return templates.TemplateResponse("servers.html", {
"request": request, "user": user, "app_name": APP_NAME,
"servers": servers, "total": total, "page": page, "per_page": 50,
"domains_list": domains_list, "envs_list": envs_list, "zones_list": zones_list,
"applications_list": applications_list, "filters": filters,
"sort": sort, "sort_dir": sort_dir,
"perms": perms, "can_edit_servers": can_edit_servers,
"links": links,
})
@router.get("/servers/export-csv")
async def servers_export_csv(request: Request, db=Depends(get_db),
domain: str = Query(None), env: str = Query(None),
tier: str = Query(None), etat: str = Query(None),
os: str = Query(None), owner: str = Query(None),
zone: str = Query(None), licence: str = Query(None),
search: str = Query(None)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
import io, csv
filters = {"domain": domain, "env": env, "tier": tier, "etat": etat,
"os": os, "owner": owner, "zone": zone, "licence": licence,
"search": search}
servers, total = list_servers(db, filters, page=1, per_page=99999, sort="hostname", sort_dir="asc")
output = io.StringIO()
w = csv.writer(output, delimiter=";")
w.writerow(["Hostname", "FQDN", "OS", "Version OS", "Domaine", "Environnement",
"Zone", "Tier", "Etat", "Owner patching", "Application"])
for s in servers:
w.writerow([
s.hostname, getattr(s, "fqdn", "") or "", s.os_family or "",
getattr(s, "os_version", "") or "",
getattr(s, "domaine", "") or "", getattr(s, "environnement", "") or "",
getattr(s, "zone_name", "") or "", s.tier or "", s.etat or "",
s.patch_os_owner or "", getattr(s, "application_name", "") or "",
])
output.seek(0)
return StreamingResponse(
iter(["\ufeff" + output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=serveurs.csv"})
@router.get("/servers/{server_id}/detail", response_class=HTMLResponse)
async def server_detail(request: Request, server_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return HTMLResponse("<p>Non autorise</p>")
s = get_server_full(db, server_id)
if not s:
return HTMLResponse("<p>Serveur non trouve</p>")
tags = get_server_tags(db, s.qid)
ips = get_server_ips(db, server_id)
from ..services.correspondance_service import get_server_links
links = get_server_links(db, server_id)
return templates.TemplateResponse("partials/server_detail.html", {
"request": request, "s": s, "tags": tags, "ips": ips, "links": links
})
@router.get("/servers/{server_id}/edit", response_class=HTMLResponse)
async def server_edit(request: Request, server_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return HTMLResponse("<p>Non autorise</p>")
s = get_server_full(db, server_id)
if not s:
return HTMLResponse("<p>Serveur non trouve</p>")
domains, envs = get_reference_data(db)
ips = get_server_ips(db, server_id)
from sqlalchemy import text as sqlt
dns_list = db.execute(sqlt(
"SELECT name FROM domain_ltd_list WHERE is_active = true ORDER BY name"
)).fetchall()
zones_list = db.execute(sqlt(
"SELECT name FROM zones ORDER BY name"
)).fetchall()
applications = db.execute(sqlt(
"SELECT id, nom_court FROM applications WHERE itop_id IS NOT NULL ORDER BY nom_court"
)).fetchall()
return templates.TemplateResponse("partials/server_edit.html", {
"request": request, "s": s, "domains": domains, "envs": envs, "ips": ips,
"dns_list": [r.name for r in dns_list],
"zones_list": [r.name for r in zones_list],
"applications": applications,
})
@router.put("/servers/{server_id}", response_class=HTMLResponse)
async def server_update(request: Request, server_id: int, db=Depends(get_db),
domain_code: str = Form(None), env_code: str = Form(None),
zone: str = Form(None), tier: str = Form(None), etat: str = Form(None),
patch_os_owner: str = Form(None), responsable_nom: str = Form(None),
referent_nom: str = Form(None), mode_operatoire: str = Form(None),
commentaire: str = Form(None),
ip_reelle: str = Form(None), ip_connexion: str = Form(None),
ssh_method: str = Form(None), domain_ltd: str = Form(None),
pref_patch_jour: str = Form(None), pref_patch_heure: str = Form(None),
application_id: str = Form(None)):
user = get_current_user(request)
if not user:
return HTMLResponse("<p>Non autorise</p>")
from ..dependencies import get_user_perms, can_edit
if not can_edit(get_user_perms(db, user), "servers"):
return HTMLResponse("<p>Permission refusee</p>", status_code=403)
data = {
"domain_code": domain_code, "env_code": env_code, "zone": zone,
"tier": tier, "etat": etat, "patch_os_owner": patch_os_owner,
"responsable_nom": responsable_nom, "referent_nom": referent_nom,
"mode_operatoire": mode_operatoire, "commentaire": commentaire,
"ip_reelle": ip_reelle, "ip_connexion": ip_connexion,
"ssh_method": ssh_method, "domain_ltd": domain_ltd,
"pref_patch_jour": pref_patch_jour, "pref_patch_heure": pref_patch_heure,
}
update_server(db, server_id, data, user.get("sub"))
# Application (changement manuel SecOps) — update + push iTop
if application_id is not None:
app_id_val = int(application_id) if application_id and application_id.strip().isdigit() else None
app_itop_id = None
app_name = None
if app_id_val:
row = db.execute(text("SELECT itop_id, nom_court FROM applications WHERE id=:id"),
{"id": app_id_val}).fetchone()
if row:
app_itop_id = row.itop_id
app_name = row.nom_court
db.execute(text("""UPDATE servers SET application_id=:aid, application_name=:an, updated_at=NOW()
WHERE id=:sid"""), {"aid": app_id_val, "an": app_name, "sid": server_id})
db.commit()
# Push iTop (best effort)
try:
from ..services.itop_service import ITopClient
from ..services.secrets_service import get_secret
srv_row = db.execute(text("SELECT hostname FROM servers WHERE id=:id"), {"id": server_id}).fetchone()
if srv_row:
url = get_secret(db, "itop_url")
u = get_secret(db, "itop_user")
p = get_secret(db, "itop_pass")
if url and u and p:
client = ITopClient(url, u, p)
r = client._call("core/get", **{"class": "VirtualMachine",
"key": f'SELECT VirtualMachine WHERE name = "{srv_row.hostname}"', "output_fields": "name"})
if r.get("objects"):
vm_id = list(r["objects"].values())[0]["key"]
new_list = [{"applicationsolution_id": int(app_itop_id)}] if app_itop_id else []
client.update("VirtualMachine", vm_id, {"applicationsolution_list": new_list})
except Exception:
pass
s = get_server_full(db, server_id)
tags = get_server_tags(db, s.qid)
ips = get_server_ips(db, server_id)
from ..services.correspondance_service import get_server_links
links = get_server_links(db, server_id)
return templates.TemplateResponse("partials/server_detail.html", {
"request": request, "s": s, "tags": tags, "ips": ips, "links": links
})
@router.post("/servers/bulk")
async def servers_bulk(request: Request, db=Depends(get_db),
server_ids: str = Form(""), bulk_field: str = Form(""),
bulk_value: str = Form("")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
from ..dependencies import get_user_perms, can_edit
if not can_edit(get_user_perms(db, user), "servers"):
return RedirectResponse(url="/servers?msg=forbidden", status_code=303)
if not server_ids or not bulk_field or not bulk_value:
return RedirectResponse(url="/servers", status_code=303)
ids = [int(x) for x in server_ids.split(",") if x.strip().isdigit()]
if not ids:
return RedirectResponse(url="/servers", status_code=303)
from sqlalchemy import text as sqlt
if bulk_field in ("tier", "etat", "patch_os_owner", "licence_support"):
db.execute(sqlt(f"UPDATE servers SET {bulk_field} = :val WHERE id = ANY(:ids)"),
{"val": bulk_value, "ids": ids})
elif bulk_field == "domain_code":
# Trouver le domain_env_id correspondant (prod par defaut)
row = db.execute(sqlt("""
SELECT de.id FROM domain_environments de
JOIN domains d ON de.domain_id = d.id
JOIN environments e ON de.environment_id = e.id
WHERE d.code = :dc ORDER BY e.display_order LIMIT 1
"""), {"dc": bulk_value}).fetchone()
if row:
db.execute(sqlt("UPDATE servers SET domain_env_id = :deid WHERE id = ANY(:ids)"),
{"deid": row.id, "ids": ids})
elif bulk_field == "env_code":
# Pour chaque serveur, garder son domaine mais changer l'env
for sid in ids:
srv = db.execute(sqlt("""
SELECT d.id as did FROM servers s
JOIN domain_environments de ON s.domain_env_id = de.id
JOIN domains d ON de.domain_id = d.id
WHERE s.id = :sid
"""), {"sid": sid}).fetchone()
if srv:
de = db.execute(sqlt("""
SELECT de.id FROM domain_environments de
JOIN environments e ON de.environment_id = e.id
WHERE de.domain_id = :did AND e.code = :ec
"""), {"did": srv.did, "ec": bulk_value}).fetchone()
if de:
db.execute(sqlt("UPDATE servers SET domain_env_id = :deid WHERE id = :sid"),
{"deid": de.id, "sid": sid})
db.commit()
return RedirectResponse(url=f"/servers?msg=bulk_{len(ids)}", status_code=303)
@router.post("/servers/{server_id}/sync-qualys", response_class=HTMLResponse)
async def server_sync_qualys(request: Request, server_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return HTMLResponse("<p>Non autorise</p>")
result = sync_server_qualys(db, server_id)
s = get_server_full(db, server_id)
tags = get_server_tags(db, s.qid) if s else []
ips = get_server_ips(db, server_id)
from ..services.correspondance_service import get_server_links
links = get_server_links(db, server_id) if s else {"as_prod": [], "as_nonprod": []}
return templates.TemplateResponse("partials/server_detail.html", {
"request": request, "s": s, "tags": tags, "ips": ips, "links": links,
"sync_msg": result.get("msg"), "sync_ok": result.get("ok"),
})