"""Router Administration — gestion des applications (solutions applicatives). Catalogue applications local + sync bidirectionnelle avec iTop ApplicationSolution. """ from fastapi import APIRouter, Request, Depends, Query, Form from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse from fastapi.templating import Jinja2Templates from sqlalchemy import text from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, can_admin, base_context from ..config import APP_NAME router = APIRouter() templates = Jinja2Templates(directory="app/templates") CRIT_CHOICES = [ ("critique", "Critique"), ("haute", "Haute"), ("standard", "Standard"), ("basse", "Basse"), ] STATUS_CHOICES = [ ("active", "Active"), ("obsolete", "Obsolete"), ("implementation", "En implémentation"), ] # Reverse map pour push iTop (iTop utilise low/medium/high/critical) CRIT_TO_ITOP = {"critique": "critical", "haute": "high", "standard": "medium", "basse": "low"} def _check_admin(request, db): user = get_current_user(request) if not user: return None, None, RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_view(perms, "settings") and not can_admin(perms, "users"): return None, None, RedirectResponse(url="/dashboard") return user, perms, None def _push_itop(db, itop_id, fields, operation="update"): """Push vers iTop (best effort). operation : 'update' | 'delete' | 'create'.""" try: from ..services.itop_service import ITopClient from ..services.secrets_service import get_secret url = get_secret(db, "itop_url") u = get_secret(db, "itop_user") p = get_secret(db, "itop_pass") if not (url and u and p): return {"pushed": False, "msg": "Credentials iTop manquants"} client = ITopClient(url, u, p) if operation == "update": r = client.update("ApplicationSolution", itop_id, fields) elif operation == "create": r = client.create("ApplicationSolution", fields) if r.get("code") == 0 and r.get("objects"): new_id = list(r["objects"].values())[0]["key"] return {"pushed": True, "itop_id": int(new_id), "msg": "Créée dans iTop"} elif operation == "delete": r = client._call("core/delete", **{"class": "ApplicationSolution", "key": str(itop_id), "comment": "PatchCenter delete"}) if r.get("code") == 0: return {"pushed": True, "msg": "iTop OK"} return {"pushed": False, "msg": (r.get("message") or "")[:120]} except Exception as e: return {"pushed": False, "msg": str(e)[:120]} @router.get("/admin/applications", response_class=HTMLResponse) async def applications_page(request: Request, db=Depends(get_db), search: str = Query(""), criticite: str = Query(""), status: str = Query(""), has_itop: str = Query(""), domain: str = Query("")): user, perms, redirect = _check_admin(request, db) if redirect: return redirect where = ["1=1"] params = {} if search: where.append("(a.nom_court ILIKE :s OR a.nom_complet ILIKE :s)") params["s"] = f"%{search}%" if criticite: where.append("a.criticite = :c"); params["c"] = criticite if status: where.append("a.status = :st"); params["st"] = status if has_itop == "yes": where.append("a.itop_id IS NOT NULL") elif has_itop == "no": where.append("a.itop_id IS NULL") if domain: where.append("""a.id IN ( SELECT DISTINCT s.application_id FROM servers s LEFT JOIN domain_environments de ON s.domain_env_id = de.id LEFT JOIN domains d ON de.domain_id = d.id WHERE d.name = :dom AND s.application_id IS NOT NULL )""") params["dom"] = domain wc = " AND ".join(where) apps = db.execute(text(f""" SELECT a.id, a.itop_id, a.nom_court, a.nom_complet, a.description, a.criticite, a.status, a.editeur, a.created_at, a.updated_at, (SELECT COUNT(*) FROM servers s WHERE s.application_id = a.id) as nb_servers, (SELECT string_agg(DISTINCT d.name, ', ' ORDER BY d.name) FROM servers s LEFT JOIN domain_environments de ON s.domain_env_id = de.id LEFT JOIN domains d ON de.domain_id = d.id WHERE s.application_id = a.id AND d.name IS NOT NULL) as domains FROM applications a WHERE {wc} ORDER BY a.nom_court """), params).fetchall() domains_list = db.execute(text("SELECT name FROM domains ORDER BY name")).fetchall() stats = { "total": db.execute(text("SELECT COUNT(*) FROM applications")).scalar() or 0, "from_itop": db.execute(text("SELECT COUNT(*) FROM applications WHERE itop_id IS NOT NULL")).scalar() or 0, "used": db.execute(text("SELECT COUNT(*) FROM applications WHERE id IN (SELECT DISTINCT application_id FROM servers WHERE application_id IS NOT NULL)")).scalar() or 0, } ctx = base_context(request, db, user) ctx.update({ "app_name": APP_NAME, "apps": apps, "stats": stats, "crit_choices": CRIT_CHOICES, "status_choices": STATUS_CHOICES, "domains_list": [d.name for d in domains_list], "filters": {"search": search, "criticite": criticite, "status": status, "has_itop": has_itop, "domain": domain}, "can_edit": can_admin(perms, "users") or can_edit(perms, "settings"), "msg": request.query_params.get("msg", ""), }) return templates.TemplateResponse("admin_applications.html", ctx) @router.post("/admin/applications/add") async def applications_add(request: Request, db=Depends(get_db), nom_court: str = Form(...), nom_complet: str = Form(""), description: str = Form(""), editeur: str = Form(""), criticite: str = Form("basse"), status: str = Form("active"), push_itop: str = Form("")): user, perms, redirect = _check_admin(request, db) if redirect: return redirect if not (can_admin(perms, "users") or can_edit(perms, "settings")): return RedirectResponse(url="/admin/applications?msg=forbidden", status_code=303) name = nom_court.strip()[:50] full = (nom_complet.strip() or name)[:200] existing = db.execute(text("SELECT id FROM applications WHERE LOWER(nom_court)=LOWER(:n)"), {"n": name}).fetchone() if existing: return RedirectResponse(url="/admin/applications?msg=exists", status_code=303) itop_id = None if push_itop == "on": r = _push_itop(db, None, { "name": name, "description": description.strip()[:500], "business_criticity": CRIT_TO_ITOP.get(criticite, "low"), "status": status, }, operation="create") if r.get("pushed") and r.get("itop_id"): itop_id = r["itop_id"] db.execute(text("""INSERT INTO applications (nom_court, nom_complet, description, editeur, criticite, status, itop_id) VALUES (:n, :nc, :d, :e, :c, :s, :iid)"""), {"n": name, "nc": full, "d": description.strip()[:500], "e": editeur.strip()[:100], "c": criticite, "s": status, "iid": itop_id}) db.commit() return RedirectResponse(url="/admin/applications?msg=added", status_code=303) @router.post("/admin/applications/{app_id}/edit") async def applications_edit(request: Request, app_id: int, db=Depends(get_db), nom_court: str = Form(...), nom_complet: str = Form(""), description: str = Form(""), editeur: str = Form(""), criticite: str = Form("basse"), status: str = Form("active")): user, perms, redirect = _check_admin(request, db) if redirect: return redirect if not (can_admin(perms, "users") or can_edit(perms, "settings")): return RedirectResponse(url="/admin/applications?msg=forbidden", status_code=303) row = db.execute(text("SELECT itop_id, nom_court FROM applications WHERE id=:id"), {"id": app_id}).fetchone() if not row: return RedirectResponse(url="/admin/applications?msg=notfound", status_code=303) name = nom_court.strip()[:50] full = (nom_complet.strip() or name)[:200] db.execute(text("""UPDATE applications SET nom_court=:n, nom_complet=:nc, description=:d, editeur=:e, criticite=:c, status=:s, updated_at=NOW() WHERE id=:id"""), {"n": name, "nc": full, "d": description.strip()[:500], "e": editeur.strip()[:100], "c": criticite, "s": status, "id": app_id}) db.commit() # Propager le nom court aux serveurs liés db.execute(text("UPDATE servers SET application_name=:an WHERE application_id=:aid"), {"an": name, "aid": app_id}) db.commit() # Push iTop si lié itop_msg = "" if row.itop_id: r = _push_itop(db, row.itop_id, { "name": name, "description": description.strip()[:500], "business_criticity": CRIT_TO_ITOP.get(criticite, "low"), "status": status, }, operation="update") itop_msg = "_itop_ok" if r.get("pushed") else "_itop_ko" return RedirectResponse(url=f"/admin/applications?msg=edited{itop_msg}", status_code=303) @router.get("/admin/applications/multi-app", response_class=HTMLResponse) async def applications_multi_app(request: Request, db=Depends(get_db)): """Liste les serveurs qui sont liés à plusieurs apps (source : iTop applicationsolution_list).""" user, perms, redirect = _check_admin(request, db) if redirect: return redirect multi = [] try: from ..services.itop_service import ITopClient from ..services.secrets_service import get_secret url = get_secret(db, "itop_url") u = get_secret(db, "itop_user") p = get_secret(db, "itop_pass") if url and u and p: client = ITopClient(url, u, p) # VMs r = client._call("core/get", **{"class": "VirtualMachine", "key": "SELECT VirtualMachine", "output_fields": "name,applicationsolution_list"}) for k, v in (r.get("objects") or {}).items(): apps = v["fields"].get("applicationsolution_list", []) if len(apps) >= 2: multi.append({ "hostname": v["fields"].get("name"), "apps": [{"name": a.get("applicationsolution_name"), "itop_id": int(a.get("applicationsolution_id", 0))} for a in apps] }) # Servers r2 = client._call("core/get", **{"class": "Server", "key": "SELECT Server", "output_fields": "name,applicationsolution_list"}) for k, v in (r2.get("objects") or {}).items(): apps = v["fields"].get("applicationsolution_list", []) if len(apps) >= 2: multi.append({ "hostname": v["fields"].get("name"), "apps": [{"name": a.get("applicationsolution_name"), "itop_id": int(a.get("applicationsolution_id", 0))} for a in apps] }) except Exception as e: pass # Enrichir : app actuelle dans PatchCenter for m in multi: hn = (m["hostname"] or "").split(".")[0].lower() row = db.execute(text("""SELECT application_id, application_name FROM servers WHERE LOWER(hostname)=:h"""), {"h": hn}).fetchone() m["current_app_name"] = row.application_name if row else None m["current_app_id"] = row.application_id if row else None ctx = base_context(request, db, user) ctx.update({"app_name": APP_NAME, "multi_servers": multi}) return templates.TemplateResponse("admin_applications_multi.html", ctx) @router.post("/admin/applications/keep-single-app") async def applications_keep_single(request: Request, db=Depends(get_db)): """Pour un serveur, garde une seule app parmi plusieurs (PatchCenter + push iTop).""" user, perms, redirect = _check_admin(request, db) if redirect: return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401) if not (can_admin(perms, "users") or can_edit(perms, "settings")): return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403) body = await request.json() hostname = (body.get("hostname") or "").strip().split(".")[0].lower() keep_itop_id = body.get("keep_itop_id") if not hostname or not keep_itop_id: return JSONResponse({"ok": False, "msg": "Paramètres manquants"}) # Trouver app locale par itop_id app = db.execute(text("SELECT id, nom_court FROM applications WHERE itop_id=:iid"), {"iid": int(keep_itop_id)}).fetchone() if not app: return JSONResponse({"ok": False, "msg": "App iTop introuvable dans catalogue"}) # Update local db.execute(text("""UPDATE servers SET application_id=:aid, application_name=:an, updated_at=NOW() WHERE LOWER(hostname)=:h"""), {"aid": app.id, "an": app.nom_court, "h": hostname}) db.commit() # Push iTop : remplacer applicationsolution_list par [keep] try: from ..services.itop_service import ITopClient from ..services.secrets_service import get_secret url = get_secret(db, "itop_url") u = get_secret(db, "itop_user") p = get_secret(db, "itop_pass") if url and u and p: client = ITopClient(url, u, p) for cls in ("VirtualMachine", "Server"): r = client._call("core/get", **{"class": cls, "key": f'SELECT {cls} WHERE name = "{hostname}"', "output_fields": "name"}) if r.get("objects"): vm_id = list(r["objects"].values())[0]["key"] client.update(cls, vm_id, {"applicationsolution_list": [{"applicationsolution_id": int(keep_itop_id)}]}) break except Exception: pass return JSONResponse({"ok": True, "app_name": app.nom_court}) @router.get("/admin/applications/{app_id}/assign", response_class=HTMLResponse) async def applications_assign_page(request: Request, app_id: int, db=Depends(get_db), search: str = Query(""), domain: str = Query(""), env: str = Query(""), assigned: str = Query(""), page: int = Query(1), per_page: int = Query(50)): """Page d'association en masse de serveurs à une application.""" user, perms, redirect = _check_admin(request, db) if redirect: return redirect app = db.execute(text("""SELECT id, itop_id, nom_court, nom_complet FROM applications WHERE id=:id"""), {"id": app_id}).fetchone() if not app: return RedirectResponse(url="/admin/applications?msg=notfound", status_code=303) where = ["s.etat NOT IN ('stock','obsolete')"] params = {} if search: where.append("s.hostname ILIKE :s"); params["s"] = f"%{search}%" if domain: where.append("d.name = :dom"); params["dom"] = domain if env: where.append("e.name = :env"); params["env"] = env if assigned == "none": where.append("s.application_id IS NULL") elif assigned == "other": where.append("s.application_id IS NOT NULL AND s.application_id != :aid") params["aid"] = app_id elif assigned == "current": where.append("s.application_id = :aid2") params["aid2"] = app_id wc = " AND ".join(where) total = db.execute(text(f"""SELECT COUNT(*) FROM servers s LEFT JOIN domain_environments de ON s.domain_env_id = de.id LEFT JOIN domains d ON de.domain_id = d.id LEFT JOIN environments e ON de.environment_id = e.id WHERE {wc}"""), params).scalar() or 0 per_page = max(20, min(per_page, 200)) total_pages = max(1, (total + per_page - 1) // per_page) page = max(1, min(page, total_pages)) offset = (page - 1) * per_page rows = db.execute(text(f"""SELECT s.id, s.hostname, s.os_family, s.application_id, s.application_name, d.name as domain_name, e.name as env_name FROM servers s LEFT JOIN domain_environments de ON s.domain_env_id = de.id LEFT JOIN domains d ON de.domain_id = d.id LEFT JOIN environments e ON de.environment_id = e.id WHERE {wc} ORDER BY s.hostname LIMIT :lim OFFSET :off"""), {**params, "lim": per_page, "off": offset}).fetchall() domains_list = db.execute(text("SELECT name FROM domains ORDER BY name")).fetchall() envs_list = db.execute(text("SELECT name FROM environments ORDER BY name")).fetchall() ctx = base_context(request, db, user) ctx.update({ "app_name": APP_NAME, "app": app, "servers": rows, "total": total, "page": page, "per_page": per_page, "total_pages": total_pages, "domains_list": [d.name for d in domains_list], "envs_list": [e.name for e in envs_list], "filters": {"search": search, "domain": domain, "env": env, "assigned": assigned}, }) return templates.TemplateResponse("admin_applications_assign.html", ctx) @router.post("/admin/applications/{app_id}/assign") async def applications_assign(request: Request, app_id: int, db=Depends(get_db)): user, perms, redirect = _check_admin(request, db) if redirect: return JSONResponse({"ok": False, "msg": "Non authentifié"}, status_code=401) if not (can_admin(perms, "users") or can_edit(perms, "settings")): return JSONResponse({"ok": False, "msg": "Permission refusée"}, status_code=403) app = db.execute(text("SELECT id, itop_id, nom_court FROM applications WHERE id=:id"), {"id": app_id}).fetchone() if not app: return JSONResponse({"ok": False, "msg": "Application introuvable"}, status_code=404) body = await request.json() server_ids = [int(x) for x in body.get("server_ids", []) if str(x).isdigit()] if not server_ids: return JSONResponse({"ok": False, "msg": "Aucun serveur sélectionné"}) placeholders = ",".join(str(i) for i in server_ids) db.execute(text(f"""UPDATE servers SET application_id=:aid, application_name=:an, updated_at=NOW() WHERE id IN ({placeholders})"""), {"aid": app_id, "an": app.nom_court}) db.commit() # Push iTop itop_pushed = 0 itop_errors = 0 if app.itop_id: try: from ..services.itop_service import ITopClient from ..services.secrets_service import get_secret url = get_secret(db, "itop_url") u = get_secret(db, "itop_user") p = get_secret(db, "itop_pass") if url and u and p: client = ITopClient(url, u, p) new_list = [{"applicationsolution_id": int(app.itop_id)}] hosts = db.execute(text(f"SELECT hostname FROM servers WHERE id IN ({placeholders})")).fetchall() for h in hosts: try: rr = client._call("core/get", **{"class": "VirtualMachine", "key": f'SELECT VirtualMachine WHERE name = "{h.hostname}"', "output_fields": "name"}) if rr.get("objects"): vm_id = list(rr["objects"].values())[0]["key"] upd = client.update("VirtualMachine", vm_id, {"applicationsolution_list": new_list}) if upd.get("code") == 0: itop_pushed += 1 else: itop_errors += 1 except Exception: itop_errors += 1 except Exception: pass return JSONResponse({"ok": True, "updated": len(server_ids), "itop_pushed": itop_pushed, "itop_errors": itop_errors, "app_name": app.nom_court}) @router.post("/admin/applications/{app_id}/delete") async def applications_delete(request: Request, app_id: int, db=Depends(get_db)): user, perms, redirect = _check_admin(request, db) if redirect: return redirect if not (can_admin(perms, "users") or can_edit(perms, "settings")): return RedirectResponse(url="/admin/applications?msg=forbidden", status_code=303) row = db.execute(text("SELECT itop_id, nom_court FROM applications WHERE id=:id"), {"id": app_id}).fetchone() if not row: return RedirectResponse(url="/admin/applications?msg=notfound", status_code=303) # Délier les serveurs d'abord n_servers = db.execute(text("UPDATE servers SET application_id=NULL, application_name=NULL WHERE application_id=:aid"), {"aid": app_id}).rowcount # Supprimer localement db.execute(text("DELETE FROM applications WHERE id=:id"), {"id": app_id}) db.commit() # Push iTop delete si lié itop_msg = "" if row.itop_id: r = _push_itop(db, row.itop_id, {}, operation="delete") itop_msg = "_itop_ok" if r.get("pushed") else "_itop_ko" return RedirectResponse(url=f"/admin/applications?msg=deleted_{n_servers}{itop_msg}", status_code=303)