"""Router Referentiel — CRUD domaines, environnements, associations, zones""" from fastapi import APIRouter, Request, Depends, Form, Query from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse from fastapi.templating import Jinja2Templates from sqlalchemy import text from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit templates = Jinja2Templates(directory="app/templates") router = APIRouter() # ========================================================= # PAGE PRINCIPALE (onglets) # ========================================================= @router.get("/referentiel", response_class=HTMLResponse) def referentiel_page(request: Request, db=Depends(get_db), tab: str = Query("domains")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_view(perms, "settings"): return RedirectResponse(url="/dashboard") can_modify = can_edit(perms, "settings") # Domaines domains = db.execute(text( "SELECT id, name, code, description, default_excludes, default_patch_window, " "default_patch_frequency, is_active, display_order FROM domains ORDER BY display_order, name" )).fetchall() # Environnements envs = db.execute(text( "SELECT id, name, code FROM environments ORDER BY id" )).fetchall() # Zones zones = db.execute(text( "SELECT id, name, description, is_dmz FROM zones ORDER BY id" )).fetchall() # Associations domain_environments assocs = db.execute(text(""" SELECT de.id, d.name as domain_name, d.id as domain_id, e.name as env_name, e.id as env_id, de.responsable_nom, de.responsable_email, de.referent_nom, de.referent_email, de.patch_window, de.patch_excludes, de.nb_servers, de.is_active FROM domain_environments de JOIN domains d ON de.domain_id = d.id JOIN environments e ON de.environment_id = e.id ORDER BY d.display_order, d.name, e.id """)).fetchall() # Domaines DNS (domain_ltd) dns_domains = db.execute(text( "SELECT id, name, description, is_active FROM domain_ltd_list ORDER BY name" )).fetchall() # Compteur serveurs par domain_ltd dns_srv_counts = {} rows = db.execute(text(""" SELECT dl.id, COUNT(s.id) as cnt FROM domain_ltd_list dl LEFT JOIN servers s ON s.domain_ltd = dl.name GROUP BY dl.id """)).fetchall() for r in rows: dns_srv_counts[r.id] = r.cnt # Compteur serveurs par domaine dom_srv_counts = {} rows = db.execute(text(""" SELECT d.id, COUNT(s.id) as cnt FROM domains d LEFT JOIN domain_environments de ON de.domain_id = d.id LEFT JOIN servers s ON s.domain_env_id = de.id GROUP BY d.id """)).fetchall() for r in rows: dom_srv_counts[r.id] = r.cnt # Compteur serveurs par env env_srv_counts = {} rows = db.execute(text(""" SELECT e.id, COUNT(s.id) as cnt FROM environments e LEFT JOIN domain_environments de ON de.environment_id = e.id LEFT JOIN servers s ON s.domain_env_id = de.id GROUP BY e.id """)).fetchall() for r in rows: env_srv_counts[r.id] = r.cnt # Compteur serveurs par zone zone_srv_counts = {} rows = db.execute(text(""" SELECT z.id, COUNT(s.id) as cnt FROM zones z LEFT JOIN servers s ON s.zone_id = z.id GROUP BY z.id """)).fetchall() for r in rows: zone_srv_counts[r.id] = r.cnt # Dernière synchro from ..services.itop_service import get_last_sync last_sync_from = get_last_sync(db, "from") last_sync_to = get_last_sync(db, "to") return templates.TemplateResponse("referentiel.html", { "request": request, "user": user, "perms": perms, "can_modify": can_modify, "tab": tab, "domains": domains, "envs": envs, "zones": zones, "assocs": assocs, "dns_domains": dns_domains, "dns_srv_counts": dns_srv_counts, "dom_srv_counts": dom_srv_counts, "env_srv_counts": env_srv_counts, "zone_srv_counts": zone_srv_counts, "last_sync_from": last_sync_from, "last_sync_to": last_sync_to, }) # ========================================================= # DOMAINES CRUD # ========================================================= @router.post("/referentiel/domains/add") def domain_add(request: Request, db=Depends(get_db), name: str = Form(...), code: str = Form(...), description: str = Form(""), default_excludes: str = Form(""), default_patch_window: str = Form(""), display_order: int = Form(0)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/referentiel?tab=domains") db.execute(text(""" INSERT INTO domains (name, code, description, default_excludes, default_patch_window, display_order) VALUES (:name, :code, :desc, :excl, :pw, :ord) """), {"name": name.strip(), "code": code.strip().upper(), "desc": description.strip(), "excl": default_excludes.strip(), "pw": default_patch_window.strip(), "ord": display_order}) db.commit() return RedirectResponse(url="/referentiel?tab=domains&msg=added", status_code=303) @router.post("/referentiel/domains/{domain_id}/edit") def domain_edit(request: Request, domain_id: int, db=Depends(get_db), name: str = Form(...), code: str = Form(...), description: str = Form(""), default_excludes: str = Form(""), default_patch_window: str = Form(""), display_order: int = Form(0), is_active: str = Form("off")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/referentiel?tab=domains") active = is_active == "on" db.execute(text(""" UPDATE domains SET name=:name, code=:code, description=:desc, default_excludes=:excl, default_patch_window=:pw, display_order=:ord, is_active=:act, updated_at=now() WHERE id=:id """), {"id": domain_id, "name": name.strip(), "code": code.strip().upper(), "desc": description.strip(), "excl": default_excludes.strip(), "pw": default_patch_window.strip(), "ord": display_order, "act": active}) db.commit() return RedirectResponse(url="/referentiel?tab=domains&msg=updated", status_code=303) @router.post("/referentiel/domains/{domain_id}/delete") def domain_delete(request: Request, domain_id: int, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/referentiel?tab=domains") # Verifier s'il y a des serveurs lies cnt = db.execute(text(""" SELECT COUNT(*) as c FROM servers s JOIN domain_environments de ON s.domain_env_id = de.id WHERE de.domain_id = :id """), {"id": domain_id}).fetchone().c if cnt > 0: return RedirectResponse( url=f"/referentiel?tab=domains&msg=nodelete&detail={cnt}", status_code=303) db.execute(text("DELETE FROM domain_environments WHERE domain_id = :id"), {"id": domain_id}) db.execute(text("DELETE FROM domains WHERE id = :id"), {"id": domain_id}) db.commit() return RedirectResponse(url="/referentiel?tab=domains&msg=deleted", status_code=303) # ========================================================= # ENVIRONNEMENTS CRUD # ========================================================= @router.post("/referentiel/envs/add") def env_add(request: Request, db=Depends(get_db), name: str = Form(...), code: str = Form(...)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/referentiel?tab=envs") db.execute(text(""" INSERT INTO environments (name, code) VALUES (:name, :code) """), {"name": name.strip(), "code": code.strip().upper()}) db.commit() return RedirectResponse(url="/referentiel?tab=envs&msg=added", status_code=303) @router.post("/referentiel/envs/{env_id}/edit") def env_edit(request: Request, env_id: int, db=Depends(get_db), name: str = Form(...), code: str = Form(...)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/referentiel?tab=envs") db.execute(text(""" UPDATE environments SET name=:name, code=:code WHERE id=:id """), {"id": env_id, "name": name.strip(), "code": code.strip().upper()}) db.commit() return RedirectResponse(url="/referentiel?tab=envs&msg=updated", status_code=303) @router.post("/referentiel/envs/{env_id}/delete") def env_delete(request: Request, env_id: int, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/referentiel?tab=envs") cnt = db.execute(text(""" SELECT COUNT(*) as c FROM servers s JOIN domain_environments de ON s.domain_env_id = de.id WHERE de.environment_id = :id """), {"id": env_id}).fetchone().c if cnt > 0: return RedirectResponse( url=f"/referentiel?tab=envs&msg=nodelete&detail={cnt}", status_code=303) db.execute(text("DELETE FROM domain_environments WHERE environment_id = :id"), {"id": env_id}) db.execute(text("DELETE FROM environments WHERE id = :id"), {"id": env_id}) db.commit() return RedirectResponse(url="/referentiel?tab=envs&msg=deleted", status_code=303) # ========================================================= # ZONES CRUD # ========================================================= @router.post("/referentiel/zones/add") def zone_add(request: Request, db=Depends(get_db), name: str = Form(...), description: str = Form(""), is_dmz: str = Form("off")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/referentiel?tab=zones") db.execute(text(""" INSERT INTO zones (name, description, is_dmz) VALUES (:name, :desc, :dmz) """), {"name": name.strip(), "desc": description.strip(), "dmz": is_dmz == "on"}) db.commit() return RedirectResponse(url="/referentiel?tab=zones&msg=added", status_code=303) @router.post("/referentiel/zones/{zone_id}/edit") def zone_edit(request: Request, zone_id: int, db=Depends(get_db), name: str = Form(...), description: str = Form(""), is_dmz: str = Form("off")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/referentiel?tab=zones") db.execute(text(""" UPDATE zones SET name=:name, description=:desc, is_dmz=:dmz WHERE id=:id """), {"id": zone_id, "name": name.strip(), "desc": description.strip(), "dmz": is_dmz == "on"}) db.commit() return RedirectResponse(url="/referentiel?tab=zones&msg=updated", status_code=303) @router.post("/referentiel/zones/{zone_id}/delete") def zone_delete(request: Request, zone_id: int, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/referentiel?tab=zones") cnt = db.execute(text( "SELECT COUNT(*) as c FROM servers WHERE zone_id = :id" ), {"id": zone_id}).fetchone().c if cnt > 0: return RedirectResponse( url=f"/referentiel?tab=zones&msg=nodelete&detail={cnt}", status_code=303) db.execute(text("DELETE FROM zones WHERE id = :id"), {"id": zone_id}) db.commit() return RedirectResponse(url="/referentiel?tab=zones&msg=deleted", status_code=303) # ========================================================= # ASSOCIATIONS DOMAIN x ENV # ========================================================= @router.post("/referentiel/assocs/add") def assoc_add(request: Request, db=Depends(get_db), domain_id: int = Form(...), environment_id: int = Form(...), responsable_nom: str = Form(""), responsable_email: str = Form(""), referent_nom: str = Form(""), referent_email: str = Form(""), patch_window: str = Form(""), patch_excludes: str = Form("")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/referentiel?tab=assocs") existing = db.execute(text( "SELECT id FROM domain_environments WHERE domain_id=:d AND environment_id=:e" ), {"d": domain_id, "e": environment_id}).fetchone() if existing: return RedirectResponse(url="/referentiel?tab=assocs&msg=exists", status_code=303) db.execute(text(""" INSERT INTO domain_environments (domain_id, environment_id, responsable_nom, responsable_email, referent_nom, referent_email, patch_window, patch_excludes) VALUES (:d, :e, :rn, :re, :fn, :fe, :pw, :pe) """), {"d": domain_id, "e": environment_id, "rn": responsable_nom.strip(), "re": responsable_email.strip(), "fn": referent_nom.strip(), "fe": referent_email.strip(), "pw": patch_window.strip(), "pe": patch_excludes.strip()}) db.commit() return RedirectResponse(url="/referentiel?tab=assocs&msg=added", status_code=303) @router.post("/referentiel/assocs/{assoc_id}/edit") def assoc_edit(request: Request, assoc_id: int, db=Depends(get_db), responsable_nom: str = Form(""), responsable_email: str = Form(""), referent_nom: str = Form(""), referent_email: str = Form(""), patch_window: str = Form(""), patch_excludes: str = Form(""), is_active: str = Form("off")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/referentiel?tab=assocs") db.execute(text(""" UPDATE domain_environments SET responsable_nom=:rn, responsable_email=:re, referent_nom=:fn, referent_email=:fe, patch_window=:pw, patch_excludes=:pe, is_active=:act WHERE id=:id """), {"id": assoc_id, "rn": responsable_nom.strip(), "re": responsable_email.strip(), "fn": referent_nom.strip(), "fe": referent_email.strip(), "pw": patch_window.strip(), "pe": patch_excludes.strip(), "act": is_active == "on"}) db.commit() return RedirectResponse(url="/referentiel?tab=assocs&msg=updated", status_code=303) @router.post("/referentiel/assocs/{assoc_id}/delete") def assoc_delete(request: Request, assoc_id: int, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/referentiel?tab=assocs") cnt = db.execute(text( "SELECT COUNT(*) as c FROM servers WHERE domain_env_id = :id" ), {"id": assoc_id}).fetchone().c if cnt > 0: return RedirectResponse( url=f"/referentiel?tab=assocs&msg=nodelete&detail={cnt}", status_code=303) db.execute(text("DELETE FROM domain_environments WHERE id = :id"), {"id": assoc_id}) db.commit() return RedirectResponse(url="/referentiel?tab=assocs&msg=deleted", status_code=303) # ========================================================= # DOMAINES DNS (domain_ltd) # ========================================================= @router.post("/referentiel/dns/add") def dns_add(request: Request, db=Depends(get_db), name: str = Form(...), description: str = Form("")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/referentiel?tab=dns") db.execute(text(""" INSERT INTO domain_ltd_list (name, description) VALUES (:name, :desc) """), {"name": name.strip().lower(), "desc": description.strip()}) db.commit() return RedirectResponse(url="/referentiel?tab=dns&msg=added", status_code=303) @router.post("/referentiel/dns/{dns_id}/edit") def dns_edit(request: Request, dns_id: int, db=Depends(get_db), name: str = Form(...), description: str = Form(""), is_active: str = Form("off")): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/referentiel?tab=dns") old = db.execute(text("SELECT name FROM domain_ltd_list WHERE id=:id"), {"id": dns_id}).fetchone() new_name = name.strip().lower() db.execute(text(""" UPDATE domain_ltd_list SET name=:name, description=:desc, is_active=:act WHERE id=:id """), {"id": dns_id, "name": new_name, "desc": description.strip(), "act": is_active == "on"}) # Propager le renommage sur les serveurs if old and old.name != new_name: db.execute(text("UPDATE servers SET domain_ltd=:new WHERE domain_ltd=:old"), {"old": old.name, "new": new_name}) db.commit() return RedirectResponse(url="/referentiel?tab=dns&msg=updated", status_code=303) @router.post("/referentiel/dns/{dns_id}/delete") def dns_delete(request: Request, dns_id: int, db=Depends(get_db)): user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/referentiel?tab=dns") row = db.execute(text("SELECT name FROM domain_ltd_list WHERE id=:id"), {"id": dns_id}).fetchone() if row: cnt = db.execute(text( "SELECT COUNT(*) as c FROM servers WHERE domain_ltd = :n" ), {"n": row.name}).fetchone().c if cnt > 0: return RedirectResponse( url=f"/referentiel?tab=dns&msg=nodelete&detail={cnt}", status_code=303) db.execute(text("DELETE FROM domain_ltd_list WHERE id = :id"), {"id": dns_id}) db.commit() return RedirectResponse(url="/referentiel?tab=dns&msg=deleted", status_code=303) # ========================================================= # SYNC iTop # ========================================================= @router.post("/referentiel/itop/sync-from") def itop_sync_from(request: Request, db=Depends(get_db)): """Importer serveurs et contacts depuis iTop""" user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/referentiel?tab=domains") from ..services.secrets_service import get_secret from ..services.itop_service import sync_from_itop itop_url = get_secret(db, "itop_url") itop_user = get_secret(db, "itop_user") itop_pass = get_secret(db, "itop_pass") if not itop_url or not itop_user: return RedirectResponse(url="/referentiel?tab=domains&msg=itop_noconfig", status_code=303) try: stats = sync_from_itop(db, itop_url, itop_user, itop_pass) msg = f"itop_from_{stats['servers_created']}_{stats['servers_updated']}_{stats['contacts']}_{stats['domains']}" except Exception as e: import traceback; traceback.print_exc() msg = f"itop_error" return RedirectResponse(url=f"/referentiel?tab=domains&msg={msg}", status_code=303) @router.post("/referentiel/itop/sync-to") def itop_sync_to(request: Request, db=Depends(get_db)): """Exporter serveurs PatchCenter vers iTop""" user = get_current_user(request) if not user: return RedirectResponse(url="/login") perms = get_user_perms(db, user) if not can_edit(perms, "settings"): return RedirectResponse(url="/referentiel?tab=domains") from ..services.secrets_service import get_secret from ..services.itop_service import sync_to_itop itop_url = get_secret(db, "itop_url") itop_user = get_secret(db, "itop_user") itop_pass = get_secret(db, "itop_pass") if not itop_url or not itop_user: return RedirectResponse(url="/referentiel?tab=domains&msg=itop_noconfig", status_code=303) try: stats = sync_to_itop(db, itop_url, itop_user, itop_pass) msg = f"itop_to_{stats.get('servers_updated',0)}_{stats.get('servers_created',0)}_{stats.get('ref_created',0)}" except Exception as e: import traceback; traceback.print_exc() msg = "itop_error" return RedirectResponse(url=f"/referentiel?tab=domains&msg={msg}", status_code=303)