- Admin applications: CRUD module (list/add/edit/delete/assign/multi-app) avec push iTop bidirectionnel (applications.py + 3 templates) - Correspondance prod<->hors-prod: migration vers server_correspondance globale, suppression ancien code quickwin, ajout filtre environnement et solution applicative, colonne environnement dans builder - Servers page: colonne application_name + equivalent(s) via get_links_bulk, filtre application_id, push iTop sur changement application - Patching: bulk_update_application, bulk_update_excludes, validations - Fix paramiko sftp.put (remote_path -> positional arg) - Tools: wiki_to_pdf.py (DokuWiki -> PDF) + generate_ppt.py (PPTX 19 slides DSI patching) + contenu source (processus_patching.txt, script_presentation.txt) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
682 lines
31 KiB
Python
682 lines
31 KiB
Python
"""Router Patching — exclusions, correspondance prod↔hors-prod, validations."""
|
|
from fastapi import APIRouter, Request, Depends, Query, Form
|
|
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from sqlalchemy import text
|
|
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, base_context
|
|
from ..services import correspondance_service as corr
|
|
from ..config import APP_NAME
|
|
|
|
router = APIRouter()
|
|
templates = Jinja2Templates(directory="app/templates")
|
|
|
|
|
|
def _can_edit_excludes(perms):
|
|
"""Peut éditer les exclusions : admin, coordinator, operator (pas viewer)."""
|
|
return can_edit(perms, "servers") or can_edit(perms, "campaigns") or can_edit(perms, "quickwin")
|
|
|
|
|
|
@router.get("/patching/config-exclusions", response_class=HTMLResponse)
|
|
async def config_exclusions_page(request: Request, db=Depends(get_db),
|
|
search: str = Query(""),
|
|
domain: str = Query(""),
|
|
env: str = Query(""),
|
|
zone: str = Query(""),
|
|
tier: str = Query(""),
|
|
os: str = Query(""),
|
|
application: str = Query(""),
|
|
has_excludes: str = Query(""),
|
|
page: int = Query(1),
|
|
per_page: int = Query(30)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not _can_edit_excludes(perms):
|
|
return RedirectResponse(url="/dashboard")
|
|
|
|
# Requête principale
|
|
where = ["1=1"]
|
|
params = {}
|
|
if search:
|
|
where.append("s.hostname ILIKE :s"); params["s"] = f"%{search}%"
|
|
if domain:
|
|
where.append("d.code = :d"); params["d"] = domain
|
|
if env:
|
|
where.append("e.code = :e"); params["e"] = env
|
|
if zone:
|
|
where.append("z.name = :z"); params["z"] = zone
|
|
if tier:
|
|
where.append("s.tier = :t"); params["t"] = tier
|
|
if os:
|
|
where.append("s.os_family = :o"); params["o"] = os
|
|
if application:
|
|
where.append("s.application_name = :app"); params["app"] = application
|
|
if has_excludes == "yes":
|
|
where.append("s.patch_excludes IS NOT NULL AND s.patch_excludes != ''")
|
|
elif has_excludes == "no":
|
|
where.append("(s.patch_excludes IS NULL OR s.patch_excludes = '')")
|
|
|
|
wc = " AND ".join(where)
|
|
|
|
# Count
|
|
total = db.execute(text(f"""
|
|
SELECT COUNT(*) FROM servers s
|
|
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
|
|
LEFT JOIN domains d ON de.domain_id = d.id
|
|
LEFT JOIN environments e ON de.environment_id = e.id
|
|
LEFT JOIN zones z ON s.zone_id = z.id
|
|
WHERE {wc}
|
|
"""), params).scalar() or 0
|
|
|
|
per_page = max(10, min(per_page, 200))
|
|
total_pages = max(1, (total + per_page - 1) // per_page)
|
|
page = max(1, min(page, total_pages))
|
|
offset = (page - 1) * per_page
|
|
|
|
rows = db.execute(text(f"""
|
|
SELECT s.id, s.hostname, s.os_family, s.os_version, s.tier, s.etat,
|
|
s.patch_excludes, s.application_name,
|
|
d.name as domain_name, d.code as domain_code,
|
|
e.name as env_name, e.code as env_code,
|
|
z.name as zone_name
|
|
FROM servers s
|
|
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
|
|
LEFT JOIN domains d ON de.domain_id = d.id
|
|
LEFT JOIN environments e ON de.environment_id = e.id
|
|
LEFT JOIN zones z ON s.zone_id = z.id
|
|
WHERE {wc}
|
|
ORDER BY s.hostname
|
|
LIMIT :limit OFFSET :offset
|
|
"""), {**params, "limit": per_page, "offset": offset}).fetchall()
|
|
|
|
# Listes pour filtres
|
|
domains = db.execute(text("SELECT code, name FROM domains ORDER BY name")).fetchall()
|
|
envs = db.execute(text("SELECT code, name FROM environments ORDER BY name")).fetchall()
|
|
zones = db.execute(text("SELECT DISTINCT name FROM zones ORDER BY name")).fetchall()
|
|
applications = db.execute(text("""SELECT application_name, COUNT(*) as c FROM servers
|
|
WHERE application_name IS NOT NULL AND application_name != ''
|
|
GROUP BY application_name ORDER BY application_name""")).fetchall()
|
|
all_apps = db.execute(text("""SELECT id, nom_court FROM applications
|
|
WHERE itop_id IS NOT NULL ORDER BY nom_court""")).fetchall()
|
|
|
|
# Stats globales
|
|
stats = {
|
|
"total_servers": db.execute(text("SELECT COUNT(*) FROM servers")).scalar(),
|
|
"with_excludes": db.execute(text("SELECT COUNT(*) FROM servers WHERE patch_excludes IS NOT NULL AND patch_excludes != ''")).scalar(),
|
|
}
|
|
|
|
ctx = base_context(request, db, user)
|
|
ctx.update({
|
|
"app_name": APP_NAME,
|
|
"servers": rows, "total": total,
|
|
"page": page, "per_page": per_page, "total_pages": total_pages,
|
|
"filters": {"search": search, "domain": domain, "env": env,
|
|
"zone": zone, "tier": tier, "os": os,
|
|
"application": application, "has_excludes": has_excludes},
|
|
"domains": domains, "envs": envs, "zones": [z.name for z in zones],
|
|
"applications": applications, "all_apps": all_apps,
|
|
"stats": stats,
|
|
"msg": request.query_params.get("msg"),
|
|
})
|
|
return templates.TemplateResponse("patching_config_exclusions.html", ctx)
|
|
|
|
|
|
@router.post("/patching/config-exclusions/{server_id}/save")
|
|
async def save_server_excludes(request: Request, server_id: int, db=Depends(get_db),
|
|
patch_excludes: str = Form("")):
|
|
"""Enregistre les exclusions d'un serveur + push iTop."""
|
|
from fastapi.responses import JSONResponse
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not _can_edit_excludes(perms):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
|
|
# Normalise : split sur espaces/newlines, dédoublonne, rejoint avec un espace
|
|
parts = [p.strip() for p in patch_excludes.replace("\n", " ").replace("\t", " ").split() if p.strip()]
|
|
seen = set()
|
|
cleaned = []
|
|
for p in parts:
|
|
if p not in seen:
|
|
seen.add(p)
|
|
cleaned.append(p)
|
|
new_val = " ".join(cleaned)
|
|
srv = db.execute(text("SELECT id, hostname FROM servers WHERE id=:id"), {"id": server_id}).fetchone()
|
|
if not srv:
|
|
return JSONResponse({"ok": False, "msg": "Serveur introuvable"}, status_code=404)
|
|
|
|
# 1. Maj base locale
|
|
db.execute(text("UPDATE servers SET patch_excludes=:pe, updated_at=NOW() WHERE id=:id"),
|
|
{"pe": new_val, "id": server_id})
|
|
db.commit()
|
|
|
|
# 2. Push iTop immédiat (best effort)
|
|
itop_result = {"pushed": False, "msg": ""}
|
|
try:
|
|
from ..services.itop_service import ITopClient
|
|
from ..services.secrets_service import get_secret
|
|
url = get_secret(db, "itop_url")
|
|
u = get_secret(db, "itop_user")
|
|
p = get_secret(db, "itop_pass")
|
|
if url and u and p:
|
|
client = ITopClient(url, u, p)
|
|
r = client._call("core/get", **{"class": "VirtualMachine",
|
|
"key": f'SELECT VirtualMachine WHERE name = "{srv.hostname}"', "output_fields": "name"})
|
|
if r.get("objects"):
|
|
vm_id = list(r["objects"].values())[0]["key"]
|
|
upd = client.update("VirtualMachine", vm_id, {"patch_excludes": new_val})
|
|
if upd.get("code") == 0:
|
|
itop_result = {"pushed": True, "msg": "Poussé vers iTop"}
|
|
else:
|
|
itop_result = {"pushed": False, "msg": f"iTop: {upd.get('message','')[:80]}"}
|
|
except Exception as e:
|
|
itop_result = {"pushed": False, "msg": f"iTop error: {str(e)[:80]}"}
|
|
|
|
return JSONResponse({"ok": True, "patch_excludes": new_val, "itop": itop_result})
|
|
|
|
|
|
@router.post("/patching/config-exclusions/bulk")
|
|
async def bulk_update_excludes(request: Request, db=Depends(get_db)):
|
|
"""Bulk add/remove pattern sur plusieurs serveurs."""
|
|
from fastapi.responses import JSONResponse
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not _can_edit_excludes(perms):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
|
|
body = await request.json()
|
|
server_ids = body.get("server_ids", [])
|
|
pattern = (body.get("pattern") or "").strip()
|
|
action = body.get("action", "add") # "add" | "remove" | "replace"
|
|
|
|
if not server_ids or not pattern and action != "replace":
|
|
return JSONResponse({"ok": False, "msg": "Paramètres manquants"})
|
|
|
|
ids = [int(x) for x in server_ids if str(x).isdigit()]
|
|
if not ids:
|
|
return JSONResponse({"ok": False, "msg": "Aucun serveur valide"})
|
|
|
|
placeholders = ",".join(str(i) for i in ids)
|
|
rows = db.execute(text(f"SELECT id, hostname, patch_excludes FROM servers WHERE id IN ({placeholders})")).fetchall()
|
|
|
|
updated = 0
|
|
for r in rows:
|
|
current = (r.patch_excludes or "").strip()
|
|
parts = current.split() if current else []
|
|
if action == "add":
|
|
if pattern not in parts:
|
|
parts.append(pattern)
|
|
elif action == "remove":
|
|
parts = [p for p in parts if p != pattern]
|
|
elif action == "replace":
|
|
parts = pattern.split()
|
|
new_val = " ".join(parts)
|
|
if new_val != current:
|
|
db.execute(text("UPDATE servers SET patch_excludes=:pe, updated_at=NOW() WHERE id=:id"),
|
|
{"pe": new_val, "id": r.id})
|
|
updated += 1
|
|
db.commit()
|
|
|
|
# Push iTop en batch (best effort, async conceptually)
|
|
itop_pushed = 0
|
|
itop_errors = 0
|
|
try:
|
|
from ..services.itop_service import ITopClient
|
|
from ..services.secrets_service import get_secret
|
|
url = get_secret(db, "itop_url")
|
|
u = get_secret(db, "itop_user")
|
|
p = get_secret(db, "itop_pass")
|
|
if url and u and p:
|
|
client = ITopClient(url, u, p)
|
|
# Refresh après update
|
|
rows2 = db.execute(text(f"SELECT hostname, patch_excludes FROM servers WHERE id IN ({placeholders})")).fetchall()
|
|
for r in rows2:
|
|
resp = client._call("core/get", **{"class": "VirtualMachine",
|
|
"key": f'SELECT VirtualMachine WHERE name = "{r.hostname}"', "output_fields": "name"})
|
|
if resp.get("objects"):
|
|
vm_id = list(resp["objects"].values())[0]["key"]
|
|
up = client.update("VirtualMachine", vm_id, {"patch_excludes": r.patch_excludes or ""})
|
|
if up.get("code") == 0:
|
|
itop_pushed += 1
|
|
else:
|
|
itop_errors += 1
|
|
except Exception:
|
|
pass
|
|
|
|
return JSONResponse({"ok": True, "updated": updated, "itop_pushed": itop_pushed, "itop_errors": itop_errors})
|
|
|
|
|
|
@router.post("/patching/config-exclusions/bulk-application")
|
|
async def bulk_update_application(request: Request, db=Depends(get_db)):
|
|
"""Bulk changement de solution applicative sur plusieurs serveurs."""
|
|
from fastapi.responses import JSONResponse
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not _can_edit_excludes(perms):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
|
|
body = await request.json()
|
|
server_ids = body.get("server_ids", [])
|
|
application_id = body.get("application_id") # int ou None/"" pour désassocier
|
|
|
|
ids = [int(x) for x in server_ids if str(x).isdigit()]
|
|
if not ids:
|
|
return JSONResponse({"ok": False, "msg": "Aucun serveur"})
|
|
|
|
app_id_val = None
|
|
app_itop_id = None
|
|
app_name = None
|
|
if application_id and str(application_id).strip().isdigit():
|
|
app_id_val = int(application_id)
|
|
row = db.execute(text("SELECT itop_id, nom_court FROM applications WHERE id=:id"),
|
|
{"id": app_id_val}).fetchone()
|
|
if row:
|
|
app_itop_id = row.itop_id
|
|
app_name = row.nom_court
|
|
else:
|
|
return JSONResponse({"ok": False, "msg": "Application introuvable"})
|
|
|
|
placeholders = ",".join(str(i) for i in ids)
|
|
db.execute(text(f"""UPDATE servers SET application_id=:aid, application_name=:an, updated_at=NOW()
|
|
WHERE id IN ({placeholders})"""), {"aid": app_id_val, "an": app_name})
|
|
db.commit()
|
|
updated = len(ids)
|
|
|
|
# Push iTop
|
|
itop_pushed = 0
|
|
itop_errors = 0
|
|
try:
|
|
from ..services.itop_service import ITopClient
|
|
from ..services.secrets_service import get_secret
|
|
url = get_secret(db, "itop_url")
|
|
u = get_secret(db, "itop_user")
|
|
p = get_secret(db, "itop_pass")
|
|
if url and u and p:
|
|
client = ITopClient(url, u, p)
|
|
new_list = [{"applicationsolution_id": int(app_itop_id)}] if app_itop_id else []
|
|
hosts = db.execute(text(f"SELECT hostname FROM servers WHERE id IN ({placeholders})")).fetchall()
|
|
for h in hosts:
|
|
try:
|
|
rr = client._call("core/get", **{"class": "VirtualMachine",
|
|
"key": f'SELECT VirtualMachine WHERE name = "{h.hostname}"', "output_fields": "name"})
|
|
if rr.get("objects"):
|
|
vm_id = list(rr["objects"].values())[0]["key"]
|
|
up = client.update("VirtualMachine", vm_id, {"applicationsolution_list": new_list})
|
|
if up.get("code") == 0:
|
|
itop_pushed += 1
|
|
else:
|
|
itop_errors += 1
|
|
except Exception:
|
|
itop_errors += 1
|
|
except Exception:
|
|
pass
|
|
|
|
return JSONResponse({"ok": True, "updated": updated, "itop_pushed": itop_pushed, "itop_errors": itop_errors,
|
|
"app_name": app_name or "(aucune)"})
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════
|
|
# Correspondance prod ↔ hors-prod
|
|
# ═══════════════════════════════════════════════════════
|
|
|
|
@router.get("/patching/correspondance", response_class=HTMLResponse)
|
|
async def correspondance_page(request: Request, db=Depends(get_db),
|
|
search: str = Query(""), application: str = Query(""),
|
|
domain: str = Query(""), env: str = Query("")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not _can_edit_excludes(perms) and not can_view(perms, "campaigns"):
|
|
return RedirectResponse(url="/dashboard")
|
|
|
|
servers = corr.get_servers_for_builder(db, search=search, app=application,
|
|
domain=domain, env=env)
|
|
server_links = corr.get_links_bulk(db, [s.id for s in servers])
|
|
|
|
applications = db.execute(text("""SELECT DISTINCT application_name FROM servers
|
|
WHERE application_name IS NOT NULL AND application_name != ''
|
|
ORDER BY application_name""")).fetchall()
|
|
envs = db.execute(text("SELECT DISTINCT name FROM environments ORDER BY name")).fetchall()
|
|
domains = db.execute(text("SELECT DISTINCT name FROM domains ORDER BY name")).fetchall()
|
|
all_apps = db.execute(text("""SELECT id, nom_court FROM applications
|
|
WHERE itop_id IS NOT NULL ORDER BY nom_court""")).fetchall()
|
|
|
|
# Stats globales
|
|
stats = {
|
|
"total_links": db.execute(text("SELECT COUNT(*) FROM server_correspondance")).scalar() or 0,
|
|
"filtered": len(servers),
|
|
}
|
|
|
|
ctx = base_context(request, db, user)
|
|
ctx.update({"app_name": APP_NAME, "servers": servers, "stats": stats,
|
|
"server_links": server_links,
|
|
"applications": applications,
|
|
"envs": [e.name for e in envs],
|
|
"domains": [d.name for d in domains],
|
|
"all_apps": all_apps,
|
|
"search": search, "application": application,
|
|
"domain": domain, "env": env,
|
|
"can_edit": _can_edit_excludes(perms),
|
|
"msg": request.query_params.get("msg", "")})
|
|
return templates.TemplateResponse("patching_correspondance.html", ctx)
|
|
|
|
|
|
@router.post("/patching/correspondance/bulk-env")
|
|
async def correspondance_bulk_env(request: Request, db=Depends(get_db)):
|
|
"""Change l'environnement réel de N serveurs (PatchCenter + push iTop)."""
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not _can_edit_excludes(perms):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
body = await request.json()
|
|
server_ids = [int(x) for x in body.get("server_ids", []) if str(x).isdigit()]
|
|
env_name = (body.get("env_name") or "").strip()
|
|
if not server_ids or not env_name:
|
|
return JSONResponse({"ok": False, "msg": "Paramètres manquants"})
|
|
|
|
# Trouver env_id
|
|
env_row = db.execute(text("SELECT id FROM environments WHERE name=:n"), {"n": env_name}).fetchone()
|
|
if not env_row:
|
|
return JSONResponse({"ok": False, "msg": f"Env '{env_name}' introuvable"})
|
|
env_id = env_row.id
|
|
|
|
placeholders = ",".join(str(i) for i in server_ids)
|
|
# Pour chaque serveur : trouver/créer le domain_env correspondant et l'affecter
|
|
updated = 0
|
|
srvs = db.execute(text(f"""SELECT s.id, s.hostname, s.domain_env_id, de.domain_id
|
|
FROM servers s LEFT JOIN domain_environments de ON s.domain_env_id = de.id
|
|
WHERE s.id IN ({placeholders})""")).fetchall()
|
|
|
|
for s in srvs:
|
|
if not s.domain_id:
|
|
continue # pas de domaine actuel, skip
|
|
# Trouver ou créer domain_environments(domain_id, env_id)
|
|
de = db.execute(text("""SELECT id FROM domain_environments
|
|
WHERE domain_id=:d AND environment_id=:e"""),
|
|
{"d": s.domain_id, "e": env_id}).fetchone()
|
|
if not de:
|
|
db.execute(text("""INSERT INTO domain_environments (domain_id, environment_id)
|
|
VALUES (:d, :e)"""), {"d": s.domain_id, "e": env_id})
|
|
db.commit()
|
|
de = db.execute(text("""SELECT id FROM domain_environments
|
|
WHERE domain_id=:d AND environment_id=:e"""),
|
|
{"d": s.domain_id, "e": env_id}).fetchone()
|
|
db.execute(text("UPDATE servers SET domain_env_id=:de_id, updated_at=NOW() WHERE id=:sid"),
|
|
{"de_id": de.id, "sid": s.id})
|
|
updated += 1
|
|
db.commit()
|
|
|
|
# Push iTop
|
|
itop_pushed = 0
|
|
itop_errors = 0
|
|
try:
|
|
from ..services.itop_service import ITopClient
|
|
from ..services.secrets_service import get_secret
|
|
url = get_secret(db, "itop_url")
|
|
u = get_secret(db, "itop_user")
|
|
p = get_secret(db, "itop_pass")
|
|
if url and u and p:
|
|
client = ITopClient(url, u, p)
|
|
for s in srvs:
|
|
try:
|
|
rr = client._call("core/get", **{"class": "VirtualMachine",
|
|
"key": f'SELECT VirtualMachine WHERE name = "{s.hostname}"', "output_fields": "name"})
|
|
if rr.get("objects"):
|
|
vm_id = list(rr["objects"].values())[0]["key"]
|
|
upd = client.update("VirtualMachine", vm_id, {
|
|
"environnement_id": f"SELECT Environnement WHERE name = '{env_name}'"
|
|
})
|
|
if upd.get("code") == 0:
|
|
itop_pushed += 1
|
|
else:
|
|
itop_errors += 1
|
|
except Exception:
|
|
itop_errors += 1
|
|
except Exception:
|
|
pass
|
|
|
|
return JSONResponse({"ok": True, "updated": updated,
|
|
"itop_pushed": itop_pushed, "itop_errors": itop_errors,
|
|
"env_name": env_name})
|
|
|
|
|
|
@router.post("/patching/correspondance/bulk-application")
|
|
async def correspondance_bulk_app(request: Request, db=Depends(get_db)):
|
|
"""Change la solution applicative de N serveurs (PatchCenter + push iTop)."""
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not _can_edit_excludes(perms):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
body = await request.json()
|
|
server_ids = [int(x) for x in body.get("server_ids", []) if str(x).isdigit()]
|
|
application_id = body.get("application_id")
|
|
if not server_ids:
|
|
return JSONResponse({"ok": False, "msg": "Aucun serveur"})
|
|
|
|
app_id_val = None
|
|
app_itop_id = None
|
|
app_name = None
|
|
if application_id and str(application_id).strip().isdigit():
|
|
app_id_val = int(application_id)
|
|
row = db.execute(text("SELECT itop_id, nom_court FROM applications WHERE id=:id"),
|
|
{"id": app_id_val}).fetchone()
|
|
if row:
|
|
app_itop_id = row.itop_id
|
|
app_name = row.nom_court
|
|
else:
|
|
return JSONResponse({"ok": False, "msg": "Application introuvable"})
|
|
|
|
placeholders = ",".join(str(i) for i in server_ids)
|
|
db.execute(text(f"""UPDATE servers SET application_id=:aid, application_name=:an, updated_at=NOW()
|
|
WHERE id IN ({placeholders})"""), {"aid": app_id_val, "an": app_name})
|
|
db.commit()
|
|
|
|
itop_pushed = 0
|
|
itop_errors = 0
|
|
try:
|
|
from ..services.itop_service import ITopClient
|
|
from ..services.secrets_service import get_secret
|
|
url = get_secret(db, "itop_url")
|
|
u = get_secret(db, "itop_user")
|
|
p = get_secret(db, "itop_pass")
|
|
if url and u and p:
|
|
client = ITopClient(url, u, p)
|
|
new_list = [{"applicationsolution_id": int(app_itop_id)}] if app_itop_id else []
|
|
hosts = db.execute(text(f"SELECT hostname FROM servers WHERE id IN ({placeholders})")).fetchall()
|
|
for h in hosts:
|
|
try:
|
|
rr = client._call("core/get", **{"class": "VirtualMachine",
|
|
"key": f'SELECT VirtualMachine WHERE name = "{h.hostname}"', "output_fields": "name"})
|
|
if rr.get("objects"):
|
|
vm_id = list(rr["objects"].values())[0]["key"]
|
|
up = client.update("VirtualMachine", vm_id, {"applicationsolution_list": new_list})
|
|
if up.get("code") == 0:
|
|
itop_pushed += 1
|
|
else:
|
|
itop_errors += 1
|
|
except Exception:
|
|
itop_errors += 1
|
|
except Exception:
|
|
pass
|
|
|
|
return JSONResponse({"ok": True, "updated": len(server_ids),
|
|
"itop_pushed": itop_pushed, "itop_errors": itop_errors,
|
|
"app_name": app_name or "(aucune)"})
|
|
|
|
|
|
@router.post("/patching/correspondance/bulk-create")
|
|
async def correspondance_bulk_create(request: Request, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not _can_edit_excludes(perms):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
body = await request.json()
|
|
prod_ids = [int(x) for x in body.get("prod_ids", []) if str(x).isdigit()]
|
|
nonprod_ids = [int(x) for x in body.get("nonprod_ids", []) if str(x).isdigit()]
|
|
env_labels = body.get("env_labels", {})
|
|
if not prod_ids or not nonprod_ids:
|
|
return JSONResponse({"ok": False, "msg": "Au moins 1 prod et 1 non-prod requis"})
|
|
r = corr.bulk_create_correspondance(db, prod_ids, nonprod_ids, env_labels, user.get("uid"))
|
|
return JSONResponse({"ok": True, **r})
|
|
|
|
|
|
@router.post("/patching/correspondance/auto-detect")
|
|
async def correspondance_auto_detect(request: Request, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not _can_edit_excludes(perms):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
stats = corr.detect_correspondances(db)
|
|
return JSONResponse({"ok": True, **{k: v for k, v in stats.items() if k != "plan"}})
|
|
|
|
|
|
@router.post("/patching/correspondance/link")
|
|
async def correspondance_link(request: Request, db=Depends(get_db),
|
|
prod_id: int = Form(...), nonprod_id: int = Form(...),
|
|
env_code: str = Form(""), note: str = Form("")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not _can_edit_excludes(perms):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
corr.create_manual_link(db, prod_id, nonprod_id, env_code, note, user.get("uid"))
|
|
return JSONResponse({"ok": True})
|
|
|
|
|
|
@router.post("/patching/correspondance/link-by-host")
|
|
async def correspondance_link_by_host(request: Request, db=Depends(get_db),
|
|
prod_id: int = Form(...),
|
|
nonprod_hostname: str = Form(...),
|
|
env_code: str = Form(""), note: str = Form("")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not _can_edit_excludes(perms):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
row = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname)=LOWER(:h)"),
|
|
{"h": nonprod_hostname.strip()}).fetchone()
|
|
if not row:
|
|
return JSONResponse({"ok": False, "msg": f"Serveur '{nonprod_hostname}' introuvable"})
|
|
corr.create_manual_link(db, prod_id, row.id, env_code, note, user.get("uid"))
|
|
return JSONResponse({"ok": True})
|
|
|
|
|
|
@router.post("/patching/correspondance/{corr_id}/delete")
|
|
async def correspondance_delete(request: Request, corr_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not _can_edit_excludes(perms):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
corr.delete_link(db, corr_id)
|
|
return JSONResponse({"ok": True})
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════
|
|
# Validations post-patching
|
|
# ═══════════════════════════════════════════════════════
|
|
|
|
@router.get("/patching/validations", response_class=HTMLResponse)
|
|
async def validations_page(request: Request, db=Depends(get_db),
|
|
status: str = Query("en_attente"),
|
|
campaign_id: int = Query(None),
|
|
env: str = Query("")):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
perms = get_user_perms(db, user)
|
|
if not _can_edit_excludes(perms):
|
|
return RedirectResponse(url="/dashboard")
|
|
|
|
validations = corr.get_pending_validations(db, env=env, campaign_id=campaign_id, status=status)
|
|
|
|
# Contacts validateurs (responsables + référents)
|
|
contacts = db.execute(text("""SELECT id, name, email, role, team
|
|
FROM contacts WHERE is_active=true
|
|
AND role IN ('responsable_applicatif','responsable_domaine','referent_technique','ra_prod','ra_test')
|
|
ORDER BY name""")).fetchall()
|
|
|
|
stats = {
|
|
"en_attente": db.execute(text("SELECT COUNT(*) FROM patch_validation WHERE status='en_attente'")).scalar() or 0,
|
|
"validated_ok": db.execute(text("SELECT COUNT(*) FROM patch_validation WHERE status='validated_ok'")).scalar() or 0,
|
|
"validated_ko": db.execute(text("SELECT COUNT(*) FROM patch_validation WHERE status='validated_ko'")).scalar() or 0,
|
|
"forced": db.execute(text("SELECT COUNT(*) FROM patch_validation WHERE status='forced'")).scalar() or 0,
|
|
}
|
|
|
|
envs = db.execute(text("SELECT DISTINCT name FROM environments ORDER BY name")).fetchall()
|
|
|
|
ctx = base_context(request, db, user)
|
|
ctx.update({"app_name": APP_NAME, "validations": validations, "contacts": contacts,
|
|
"stats": stats, "status": status, "campaign_id": campaign_id, "env": env,
|
|
"envs": [e.name for e in envs],
|
|
"can_force": can_edit(perms, "campaigns") or user.get("role") == "admin",
|
|
"msg": request.query_params.get("msg", "")})
|
|
return templates.TemplateResponse("patching_validations.html", ctx)
|
|
|
|
|
|
@router.post("/patching/validations/mark")
|
|
async def validations_mark(request: Request, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return JSONResponse({"ok": False}, status_code=401)
|
|
perms = get_user_perms(db, user)
|
|
if not _can_edit_excludes(perms):
|
|
return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403)
|
|
|
|
body = await request.json()
|
|
ids = body.get("validation_ids", [])
|
|
status = body.get("status", "validated_ok")
|
|
contact_id = body.get("contact_id")
|
|
forced_reason = body.get("forced_reason", "")
|
|
notes = body.get("notes", "")
|
|
|
|
if status not in ("validated_ok", "validated_ko", "forced"):
|
|
return JSONResponse({"ok": False, "msg": "Status invalide"})
|
|
if status == "forced" and not forced_reason.strip():
|
|
return JSONResponse({"ok": False, "msg": "Raison obligatoire pour forcer"})
|
|
if status in ("validated_ok", "validated_ko") and not contact_id:
|
|
return JSONResponse({"ok": False, "msg": "Validateur obligatoire"})
|
|
|
|
validator_name = None
|
|
if contact_id:
|
|
row = db.execute(text("SELECT name FROM contacts WHERE id=:id"),
|
|
{"id": int(contact_id)}).fetchone()
|
|
if row:
|
|
validator_name = row.name
|
|
|
|
n = corr.mark_validation(db, ids, status, contact_id, validator_name,
|
|
forced_reason, notes, user.get("uid"))
|
|
return JSONResponse({"ok": True, "updated": n})
|
|
|
|
|
|
@router.get("/patching/validations/history/{server_id}", response_class=HTMLResponse)
|
|
async def validations_history(request: Request, server_id: int, db=Depends(get_db)):
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return RedirectResponse(url="/login")
|
|
srv = db.execute(text("SELECT id, hostname FROM servers WHERE id=:id"), {"id": server_id}).fetchone()
|
|
if not srv:
|
|
return HTMLResponse("Serveur introuvable", status_code=404)
|
|
history = corr.get_validation_history(db, server_id)
|
|
ctx = base_context(request, db, user)
|
|
ctx.update({"app_name": APP_NAME, "server": srv, "history": history})
|
|
return templates.TemplateResponse("patching_validations_history.html", ctx)
|