283 lines
13 KiB
Python
283 lines
13 KiB
Python
"""Router serveurs — CRUD + detail + edit via HTMX"""
|
|
from fastapi import APIRouter, Request, Depends, Query, Form
|
|
from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from sqlalchemy import text
|
|
from ..dependencies import get_db, get_current_user
|
|
from ..services.server_service import (
|
|
get_server_full, get_server_tags, get_server_ips,
|
|
list_servers, update_server, get_reference_data
|
|
)
|
|
from ..services.qualys_service import sync_server_qualys
|
|
from ..config import APP_NAME
|
|
|
|
router = APIRouter()
|
|
templates = Jinja2Templates(directory="app/templates")
|
|
|
|
|
|
@router.get("/servers", response_class=HTMLResponse)
|
|
async def servers_list(request: Request, db=Depends(get_db),
|
|
domain: str = Query(None), env: str = Query(None),
|
|
tier: str = Query(None), etat: str = Query(None),
|
|
os: str = Query(None), owner: str = Query(None),
|
|
zone: str = Query(None),
|
|
application: str = Query(None), application_id: int = Query(None),
|
|
search: str = Query(None), page: int = Query(1),
|
|
sort: str = Query("hostname"), sort_dir: str = Query("asc")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
|
|
filters = {"domain": domain, "env": env, "tier": tier, "etat": etat, "os": os,
|
|
"owner": owner, "zone": zone,
|
|
"application": application, "application_id": application_id,
|
|
"search": search}
|
|
servers, total = list_servers(db, filters, page, sort=sort, sort_dir=sort_dir)
|
|
domains_list, envs_list = get_reference_data(db)
|
|
zones_list = db.execute(text("SELECT name, is_dmz FROM zones ORDER BY is_dmz DESC, name")).fetchall()
|
|
|
|
applications_list = db.execute(text("""SELECT application_name, COUNT(*) as c FROM servers
|
|
WHERE application_name IS NOT NULL AND application_name != ''
|
|
GROUP BY application_name ORDER BY application_name""")).fetchall()
|
|
|
|
from ..dependencies import get_user_perms, can_edit
|
|
perms = get_user_perms(db, user)
|
|
can_edit_servers = can_edit(perms, "servers")
|
|
|
|
# Correspondances en bulk pour la page en cours
|
|
from ..services.correspondance_service import get_links_bulk
|
|
links = get_links_bulk(db, [s.id for s in servers])
|
|
|
|
return templates.TemplateResponse("servers.html", {
|
|
"request": request, "user": user, "app_name": APP_NAME,
|
|
"servers": servers, "total": total, "page": page, "per_page": 50,
|
|
"domains_list": domains_list, "envs_list": envs_list, "zones_list": zones_list,
|
|
"applications_list": applications_list, "filters": filters,
|
|
"sort": sort, "sort_dir": sort_dir,
|
|
"perms": perms, "can_edit_servers": can_edit_servers,
|
|
"links": links,
|
|
})
|
|
|
|
|
|
|
|
@router.get("/servers/export-csv")
|
|
async def servers_export_csv(request: Request, db=Depends(get_db),
|
|
domain: str = Query(None), env: str = Query(None),
|
|
tier: str = Query(None), etat: str = Query(None),
|
|
os: str = Query(None), owner: str = Query(None),
|
|
search: str = Query(None)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
import io, csv
|
|
filters = {"domain": domain, "env": env, "tier": tier, "etat": etat, "os": os, "owner": owner, "search": search}
|
|
servers, total = list_servers(db, filters, page=1, per_page=99999, sort="hostname", sort_dir="asc")
|
|
output = io.StringIO()
|
|
w = csv.writer(output, delimiter=";")
|
|
w.writerow(["Hostname", "FQDN", "OS", "Version OS", "Domaine", "Environnement",
|
|
"Zone", "Tier", "Etat", "Owner patching", "Application"])
|
|
for s in servers:
|
|
w.writerow([
|
|
s.hostname, getattr(s, "fqdn", "") or "", s.os_family or "",
|
|
getattr(s, "os_version", "") or "",
|
|
getattr(s, "domaine", "") or "", getattr(s, "environnement", "") or "",
|
|
getattr(s, "zone_name", "") or "", s.tier or "", s.etat or "",
|
|
s.patch_os_owner or "", getattr(s, "application_name", "") or "",
|
|
])
|
|
output.seek(0)
|
|
return StreamingResponse(
|
|
iter(["\ufeff" + output.getvalue()]),
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=serveurs.csv"})
|
|
|
|
@router.get("/servers/{server_id}/detail", response_class=HTMLResponse)
|
|
async def server_detail(request: Request, server_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return HTMLResponse("<p>Non autorise</p>")
|
|
s = get_server_full(db, server_id)
|
|
if not s:
|
|
return HTMLResponse("<p>Serveur non trouve</p>")
|
|
tags = get_server_tags(db, s.qid)
|
|
ips = get_server_ips(db, server_id)
|
|
from ..services.correspondance_service import get_server_links
|
|
links = get_server_links(db, server_id)
|
|
return templates.TemplateResponse("partials/server_detail.html", {
|
|
"request": request, "s": s, "tags": tags, "ips": ips, "links": links
|
|
})
|
|
|
|
|
|
@router.get("/servers/{server_id}/edit", response_class=HTMLResponse)
|
|
async def server_edit(request: Request, server_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return HTMLResponse("<p>Non autorise</p>")
|
|
s = get_server_full(db, server_id)
|
|
if not s:
|
|
return HTMLResponse("<p>Serveur non trouve</p>")
|
|
domains, envs = get_reference_data(db)
|
|
ips = get_server_ips(db, server_id)
|
|
from sqlalchemy import text as sqlt
|
|
dns_list = db.execute(sqlt(
|
|
"SELECT name FROM domain_ltd_list WHERE is_active = true ORDER BY name"
|
|
)).fetchall()
|
|
zones_list = db.execute(sqlt(
|
|
"SELECT name FROM zones ORDER BY name"
|
|
)).fetchall()
|
|
applications = db.execute(sqlt(
|
|
"SELECT id, nom_court FROM applications WHERE itop_id IS NOT NULL ORDER BY nom_court"
|
|
)).fetchall()
|
|
return templates.TemplateResponse("partials/server_edit.html", {
|
|
"request": request, "s": s, "domains": domains, "envs": envs, "ips": ips,
|
|
"dns_list": [r.name for r in dns_list],
|
|
"zones_list": [r.name for r in zones_list],
|
|
"applications": applications,
|
|
})
|
|
|
|
|
|
@router.put("/servers/{server_id}", response_class=HTMLResponse)
|
|
async def server_update(request: Request, server_id: int, db=Depends(get_db),
|
|
domain_code: str = Form(None), env_code: str = Form(None),
|
|
zone: str = Form(None), tier: str = Form(None), etat: str = Form(None),
|
|
patch_os_owner: str = Form(None), responsable_nom: str = Form(None),
|
|
referent_nom: str = Form(None), mode_operatoire: str = Form(None),
|
|
commentaire: str = Form(None),
|
|
ip_reelle: str = Form(None), ip_connexion: str = Form(None),
|
|
ssh_method: str = Form(None), domain_ltd: str = Form(None),
|
|
pref_patch_jour: str = Form(None), pref_patch_heure: str = Form(None),
|
|
application_id: str = Form(None)):
|
|
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return HTMLResponse("<p>Non autorise</p>")
|
|
from ..dependencies import get_user_perms, can_edit
|
|
if not can_edit(get_user_perms(db, user), "servers"):
|
|
return HTMLResponse("<p>Permission refusee</p>", status_code=403)
|
|
|
|
data = {
|
|
"domain_code": domain_code, "env_code": env_code, "zone": zone,
|
|
"tier": tier, "etat": etat, "patch_os_owner": patch_os_owner,
|
|
"responsable_nom": responsable_nom, "referent_nom": referent_nom,
|
|
"mode_operatoire": mode_operatoire, "commentaire": commentaire,
|
|
"ip_reelle": ip_reelle, "ip_connexion": ip_connexion,
|
|
"ssh_method": ssh_method, "domain_ltd": domain_ltd,
|
|
"pref_patch_jour": pref_patch_jour, "pref_patch_heure": pref_patch_heure,
|
|
}
|
|
update_server(db, server_id, data, user.get("sub"))
|
|
|
|
# Application (changement manuel SecOps) — update + push iTop
|
|
if application_id is not None:
|
|
app_id_val = int(application_id) if application_id and application_id.strip().isdigit() else None
|
|
app_itop_id = None
|
|
app_name = None
|
|
if app_id_val:
|
|
row = db.execute(text("SELECT itop_id, nom_court FROM applications WHERE id=:id"),
|
|
{"id": app_id_val}).fetchone()
|
|
if row:
|
|
app_itop_id = row.itop_id
|
|
app_name = row.nom_court
|
|
db.execute(text("""UPDATE servers SET application_id=:aid, application_name=:an, updated_at=NOW()
|
|
WHERE id=:sid"""), {"aid": app_id_val, "an": app_name, "sid": server_id})
|
|
db.commit()
|
|
# Push iTop (best effort)
|
|
try:
|
|
from ..services.itop_service import ITopClient
|
|
from ..services.secrets_service import get_secret
|
|
srv_row = db.execute(text("SELECT hostname FROM servers WHERE id=:id"), {"id": server_id}).fetchone()
|
|
if srv_row:
|
|
url = get_secret(db, "itop_url")
|
|
u = get_secret(db, "itop_user")
|
|
p = get_secret(db, "itop_pass")
|
|
if url and u and p:
|
|
client = ITopClient(url, u, p)
|
|
r = client._call("core/get", **{"class": "VirtualMachine",
|
|
"key": f'SELECT VirtualMachine WHERE name = "{srv_row.hostname}"', "output_fields": "name"})
|
|
if r.get("objects"):
|
|
vm_id = list(r["objects"].values())[0]["key"]
|
|
new_list = [{"applicationsolution_id": int(app_itop_id)}] if app_itop_id else []
|
|
client.update("VirtualMachine", vm_id, {"applicationsolution_list": new_list})
|
|
except Exception:
|
|
pass
|
|
|
|
s = get_server_full(db, server_id)
|
|
tags = get_server_tags(db, s.qid)
|
|
ips = get_server_ips(db, server_id)
|
|
from ..services.correspondance_service import get_server_links
|
|
links = get_server_links(db, server_id)
|
|
return templates.TemplateResponse("partials/server_detail.html", {
|
|
"request": request, "s": s, "tags": tags, "ips": ips, "links": links
|
|
})
|
|
|
|
|
|
@router.post("/servers/bulk")
|
|
async def servers_bulk(request: Request, db=Depends(get_db),
|
|
server_ids: str = Form(""), bulk_field: str = Form(""),
|
|
bulk_value: str = Form("")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
from ..dependencies import get_user_perms, can_edit
|
|
if not can_edit(get_user_perms(db, user), "servers"):
|
|
return RedirectResponse(url="/servers?msg=forbidden", status_code=303)
|
|
if not server_ids or not bulk_field or not bulk_value:
|
|
return RedirectResponse(url="/servers", status_code=303)
|
|
|
|
ids = [int(x) for x in server_ids.split(",") if x.strip().isdigit()]
|
|
if not ids:
|
|
return RedirectResponse(url="/servers", status_code=303)
|
|
|
|
from sqlalchemy import text as sqlt
|
|
|
|
if bulk_field in ("tier", "etat", "patch_os_owner", "licence_support"):
|
|
db.execute(sqlt(f"UPDATE servers SET {bulk_field} = :val WHERE id = ANY(:ids)"),
|
|
{"val": bulk_value, "ids": ids})
|
|
elif bulk_field == "domain_code":
|
|
# Trouver le domain_env_id correspondant (prod par defaut)
|
|
row = db.execute(sqlt("""
|
|
SELECT de.id FROM domain_environments de
|
|
JOIN domains d ON de.domain_id = d.id
|
|
JOIN environments e ON de.environment_id = e.id
|
|
WHERE d.code = :dc ORDER BY e.display_order LIMIT 1
|
|
"""), {"dc": bulk_value}).fetchone()
|
|
if row:
|
|
db.execute(sqlt("UPDATE servers SET domain_env_id = :deid WHERE id = ANY(:ids)"),
|
|
{"deid": row.id, "ids": ids})
|
|
elif bulk_field == "env_code":
|
|
# Pour chaque serveur, garder son domaine mais changer l'env
|
|
for sid in ids:
|
|
srv = db.execute(sqlt("""
|
|
SELECT d.id as did FROM servers s
|
|
JOIN domain_environments de ON s.domain_env_id = de.id
|
|
JOIN domains d ON de.domain_id = d.id
|
|
WHERE s.id = :sid
|
|
"""), {"sid": sid}).fetchone()
|
|
if srv:
|
|
de = db.execute(sqlt("""
|
|
SELECT de.id FROM domain_environments de
|
|
JOIN environments e ON de.environment_id = e.id
|
|
WHERE de.domain_id = :did AND e.code = :ec
|
|
"""), {"did": srv.did, "ec": bulk_value}).fetchone()
|
|
if de:
|
|
db.execute(sqlt("UPDATE servers SET domain_env_id = :deid WHERE id = :sid"),
|
|
{"deid": de.id, "sid": sid})
|
|
|
|
db.commit()
|
|
return RedirectResponse(url=f"/servers?msg=bulk_{len(ids)}", status_code=303)
|
|
|
|
|
|
@router.post("/servers/{server_id}/sync-qualys", response_class=HTMLResponse)
|
|
async def server_sync_qualys(request: Request, server_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return HTMLResponse("<p>Non autorise</p>")
|
|
result = sync_server_qualys(db, server_id)
|
|
s = get_server_full(db, server_id)
|
|
tags = get_server_tags(db, s.qid) if s else []
|
|
ips = get_server_ips(db, server_id)
|
|
from ..services.correspondance_service import get_server_links
|
|
links = get_server_links(db, server_id) if s else {"as_prod": [], "as_nonprod": []}
|
|
return templates.TemplateResponse("partials/server_detail.html", {
|
|
"request": request, "s": s, "tags": tags, "ips": ips, "links": links,
|
|
"sync_msg": result.get("msg"), "sync_ok": result.get("ok"),
|
|
})
|