- BOC SAP: stop_order ajouté (SolMan→CM→CC→AS→CI→HANA), conforme doc v3.1.4 - 3 serveurs BOC HO ajoutés (vpbocarep1, vpbocasec1, vpbocjump1) - Patch waves: DNS (V1: 1+3, V2: 2+4), SMTP (V1: smtp2, V2: smtp1) - Colonnes Stop order / Start order dans Spécifiques - Campagne: colonne Zone (DMZ rouge, EMV jaune, LAN bleu) + KPI DMZ - DMZ: filtre par zone au lieu de domaine (27 serveurs récupérés) - Préférences patching: jour/heure éditables dans serveurs, hérités en campagne - KPIs campagne en flex (une seule ligne) - Limites intervenants: layout compact (max-width 400px) - Tri campagne: domaine → hors-prod/prod → hostname - Opérateur peut prendre en in_progress + planned - Actions bulk campagne: prendre/assigner/exclure en masse - Formulaires inline: fix Alpine.js → JS pur (display:none par défaut) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
169 lines
7.3 KiB
Python
169 lines
7.3 KiB
Python
"""Router serveurs — CRUD + detail + edit via HTMX"""
|
|
from fastapi import APIRouter, Request, Depends, Query, Form
|
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from ..dependencies import get_db, get_current_user
|
|
from ..services.server_service import (
|
|
get_server_full, get_server_tags, get_server_ips,
|
|
list_servers, update_server, get_reference_data
|
|
)
|
|
from ..services.qualys_service import sync_server_qualys
|
|
from ..config import APP_NAME
|
|
|
|
router = APIRouter()
|
|
templates = Jinja2Templates(directory="app/templates")
|
|
|
|
|
|
@router.get("/servers", response_class=HTMLResponse)
|
|
async def servers_list(request: Request, db=Depends(get_db),
|
|
domain: str = Query(None), env: str = Query(None),
|
|
tier: str = Query(None), etat: str = Query(None),
|
|
search: str = Query(None), page: int = Query(1),
|
|
sort: str = Query("hostname"), sort_dir: str = Query("asc")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
|
|
filters = {"domain": domain, "env": env, "tier": tier, "etat": etat, "search": search}
|
|
servers, total = list_servers(db, filters, page, sort=sort, sort_dir=sort_dir)
|
|
domains_list, envs_list = get_reference_data(db)
|
|
|
|
return templates.TemplateResponse("servers.html", {
|
|
"request": request, "user": user, "app_name": APP_NAME,
|
|
"servers": servers, "total": total, "page": page, "per_page": 50,
|
|
"domains_list": domains_list, "envs_list": envs_list, "filters": filters,
|
|
"sort": sort, "sort_dir": sort_dir,
|
|
})
|
|
|
|
|
|
@router.get("/servers/{server_id}/detail", response_class=HTMLResponse)
|
|
async def server_detail(request: Request, server_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return HTMLResponse("<p>Non autorise</p>")
|
|
s = get_server_full(db, server_id)
|
|
if not s:
|
|
return HTMLResponse("<p>Serveur non trouve</p>")
|
|
tags = get_server_tags(db, s.qid)
|
|
ips = get_server_ips(db, server_id)
|
|
return templates.TemplateResponse("partials/server_detail.html", {
|
|
"request": request, "s": s, "tags": tags, "ips": ips
|
|
})
|
|
|
|
|
|
@router.get("/servers/{server_id}/edit", response_class=HTMLResponse)
|
|
async def server_edit(request: Request, server_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return HTMLResponse("<p>Non autorise</p>")
|
|
s = get_server_full(db, server_id)
|
|
if not s:
|
|
return HTMLResponse("<p>Serveur non trouve</p>")
|
|
domains, envs = get_reference_data(db)
|
|
ips = get_server_ips(db, server_id)
|
|
return templates.TemplateResponse("partials/server_edit.html", {
|
|
"request": request, "s": s, "domains": domains, "envs": envs, "ips": ips
|
|
})
|
|
|
|
|
|
@router.put("/servers/{server_id}", response_class=HTMLResponse)
|
|
async def server_update(request: Request, server_id: int, db=Depends(get_db),
|
|
domain_code: str = Form(None), env_code: str = Form(None),
|
|
zone: str = Form(None), tier: str = Form(None), etat: str = Form(None),
|
|
patch_os_owner: str = Form(None), responsable_nom: str = Form(None),
|
|
referent_nom: str = Form(None), mode_operatoire: str = Form(None),
|
|
commentaire: str = Form(None),
|
|
ip_reelle: str = Form(None), ip_connexion: str = Form(None),
|
|
ssh_method: str = Form(None),
|
|
pref_patch_jour: str = Form(None), pref_patch_heure: str = Form(None)):
|
|
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return HTMLResponse("<p>Non autorise</p>")
|
|
|
|
data = {
|
|
"domain_code": domain_code, "env_code": env_code, "zone": zone,
|
|
"tier": tier, "etat": etat, "patch_os_owner": patch_os_owner,
|
|
"responsable_nom": responsable_nom, "referent_nom": referent_nom,
|
|
"mode_operatoire": mode_operatoire, "commentaire": commentaire,
|
|
"ip_reelle": ip_reelle, "ip_connexion": ip_connexion,
|
|
"ssh_method": ssh_method,
|
|
"pref_patch_jour": pref_patch_jour, "pref_patch_heure": pref_patch_heure,
|
|
}
|
|
update_server(db, server_id, data, user.get("sub"))
|
|
|
|
s = get_server_full(db, server_id)
|
|
tags = get_server_tags(db, s.qid)
|
|
ips = get_server_ips(db, server_id)
|
|
return templates.TemplateResponse("partials/server_detail.html", {
|
|
"request": request, "s": s, "tags": tags, "ips": ips
|
|
})
|
|
|
|
|
|
@router.post("/servers/bulk")
|
|
async def servers_bulk(request: Request, db=Depends(get_db),
|
|
server_ids: str = Form(""), bulk_field: str = Form(""),
|
|
bulk_value: str = Form("")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
if not server_ids or not bulk_field or not bulk_value:
|
|
return RedirectResponse(url="/servers", status_code=303)
|
|
|
|
ids = [int(x) for x in server_ids.split(",") if x.strip().isdigit()]
|
|
if not ids:
|
|
return RedirectResponse(url="/servers", status_code=303)
|
|
|
|
from sqlalchemy import text as sqlt
|
|
|
|
if bulk_field in ("tier", "etat", "patch_os_owner", "licence_support"):
|
|
db.execute(sqlt(f"UPDATE servers SET {bulk_field} = :val WHERE id = ANY(:ids)"),
|
|
{"val": bulk_value, "ids": ids})
|
|
elif bulk_field == "domain_code":
|
|
# Trouver le domain_env_id correspondant (prod par defaut)
|
|
row = db.execute(sqlt("""
|
|
SELECT de.id FROM domain_environments de
|
|
JOIN domains d ON de.domain_id = d.id
|
|
JOIN environments e ON de.environment_id = e.id
|
|
WHERE d.code = :dc ORDER BY e.display_order LIMIT 1
|
|
"""), {"dc": bulk_value}).fetchone()
|
|
if row:
|
|
db.execute(sqlt("UPDATE servers SET domain_env_id = :deid WHERE id = ANY(:ids)"),
|
|
{"deid": row.id, "ids": ids})
|
|
elif bulk_field == "env_code":
|
|
# Pour chaque serveur, garder son domaine mais changer l'env
|
|
for sid in ids:
|
|
srv = db.execute(sqlt("""
|
|
SELECT d.id as did FROM servers s
|
|
JOIN domain_environments de ON s.domain_env_id = de.id
|
|
JOIN domains d ON de.domain_id = d.id
|
|
WHERE s.id = :sid
|
|
"""), {"sid": sid}).fetchone()
|
|
if srv:
|
|
de = db.execute(sqlt("""
|
|
SELECT de.id FROM domain_environments de
|
|
JOIN environments e ON de.environment_id = e.id
|
|
WHERE de.domain_id = :did AND e.code = :ec
|
|
"""), {"did": srv.did, "ec": bulk_value}).fetchone()
|
|
if de:
|
|
db.execute(sqlt("UPDATE servers SET domain_env_id = :deid WHERE id = :sid"),
|
|
{"deid": de.id, "sid": sid})
|
|
|
|
db.commit()
|
|
return RedirectResponse(url=f"/servers?msg=bulk_{len(ids)}", status_code=303)
|
|
|
|
|
|
@router.post("/servers/{server_id}/sync-qualys", response_class=HTMLResponse)
|
|
async def server_sync_qualys(request: Request, server_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return HTMLResponse("<p>Non autorise</p>")
|
|
result = sync_server_qualys(db, server_id)
|
|
s = get_server_full(db, server_id)
|
|
tags = get_server_tags(db, s.qid) if s else []
|
|
ips = get_server_ips(db, server_id)
|
|
return templates.TemplateResponse("partials/server_detail.html", {
|
|
"request": request, "s": s, "tags": tags, "ips": ips,
|
|
"sync_msg": result.get("msg"), "sync_ok": result.get("ok"),
|
|
})
|