patchcenter/app/routers/specifics.py
Khalid MOUTAOUAKIL 13290c1ebb Phase 1 securite: permission checks sur tous les routers
- auth: verification is_active au login (compte desactive = bloque)
- settings: enforcement backend can_edit(settings) + role/section
- servers: can_view/can_edit(servers) sur toutes les routes
- planning: can_view/can_edit(planning) sur toutes les routes
- specifics: can_view/can_edit(specifics) sur toutes les routes
- contacts: rattache au module servers (can_view/can_edit)
- campaigns: can_view/can_edit(campaigns) sur toutes les routes manquantes
- audit/audit_full: can_view/can_edit(audit) sur toutes les routes
- qualys: can_view/can_edit(qualys) sur toutes les routes
- safe_patching: perm checks + authentification sur SSE stream
- quickwin: can_view/can_edit(campaigns|quickwin) sur toutes les routes

97 points d'injection securises, 0 route sans controle

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-08 16:46:05 +02:00

175 lines
9.2 KiB
Python

"""Router serveurs specifiques — vue et edition des specificites patching"""
from fastapi import APIRouter, Request, Depends, Query, Form
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
from ..dependencies import get_db, get_current_user, get_user_perms, can_view, can_edit, can_admin, base_context
from ..config import APP_NAME
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
APP_TYPES = [
"SAP BOC", "Oracle ASM", "Oracle OEM", "Podman FL", "OCR Flux Libre",
"SI Patrimoine", "HAProxy FL", "Site institutionnel", "Centreon Poller",
"Sextan", "OCTAN", "DATI", "Covoiturage", "Scoop Docker", "Splunk Enterprise",
"SAS BI", "SMTP Relay", "PostgreSQL", "Masterparc", "Gaspar",
"Temps de parcours", "PAIPOR", "COMMVAULT", "Talend", "Autre",
]
def _list_specifics(db, app_type=None, search=None):
where = ["1=1"]
params = {}
if app_type:
where.append("ss.app_type = :at"); params["at"] = app_type
if search:
where.append("s.hostname ILIKE :q"); params["q"] = f"%{search}%"
wc = " AND ".join(where)
return db.execute(text(f"""
SELECT ss.*, s.hostname, s.fqdn, s.os_family, s.tier,
d.name as domaine, e.name as environnement,
dep.hostname as dep_hostname
FROM server_specifics ss
JOIN servers s ON ss.server_id = s.id
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
LEFT JOIN domains d ON de.domain_id = d.id
LEFT JOIN environments e ON de.environment_id = e.id
LEFT JOIN servers dep ON ss.dependency_server_id = dep.id
WHERE {wc}
ORDER BY ss.app_type, ss.patch_order_group, ss.reboot_order, s.hostname
"""), params).fetchall()
@router.get("/specifics", response_class=HTMLResponse)
async def specifics_list(request: Request, db=Depends(get_db),
app_type: str = Query(None), search: str = Query(None)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_view(perms, "specifics"):
return RedirectResponse(url="/dashboard")
entries = _list_specifics(db, app_type, search)
# Types en base
types_in_db = db.execute(text(
"SELECT DISTINCT app_type FROM server_specifics WHERE app_type IS NOT NULL ORDER BY app_type"
)).fetchall()
return templates.TemplateResponse("specifics.html", {
"request": request, "user": user, "app_name": APP_NAME,
"entries": entries, "app_types": APP_TYPES,
"types_in_db": [r.app_type for r in types_in_db],
"filter_type": app_type, "filter_search": search,
})
@router.get("/specifics/{spec_id}/edit", response_class=HTMLResponse)
async def specific_edit(request: Request, spec_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return HTMLResponse("<p>Non autorise</p>")
perms = get_user_perms(db, user)
if not can_edit(perms, "specifics"):
return HTMLResponse("<p>Acces interdit</p>", status_code=403)
row = db.execute(text("""
SELECT ss.*, s.hostname FROM server_specifics ss
JOIN servers s ON ss.server_id = s.id WHERE ss.id = :id
"""), {"id": spec_id}).fetchone()
if not row:
return HTMLResponse("<p>Non trouve</p>")
return templates.TemplateResponse("partials/specific_edit.html", {
"request": request, "sp": row, "app_types": APP_TYPES,
})
@router.post("/specifics/{spec_id}/save")
async def specific_save(request: Request, spec_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "specifics"):
return RedirectResponse(url="/specifics")
form = await request.form()
def val(k): v = form.get(k, ""); return v.strip() if v else None
def bval(k): return form.get(k) == "on"
def ival(k): v = form.get(k, ""); return int(v) if v.strip() else None
db.execute(text("""
UPDATE server_specifics SET
app_type = :app_type, reboot_order = :reboot_order, stop_order = :stop_order, reboot_order_note = :reboot_order_note,
patch_wave = :patch_wave, patch_wave_group = :patch_wave_group, patch_wave_delay_days = :patch_wave_delay, patch_wave_note = :patch_wave_note,
cmd_before_patch = :cmd_before_patch, cmd_after_patch = :cmd_after_patch,
cmd_before_reboot = :cmd_before_reboot, cmd_after_reboot = :cmd_after_reboot,
stop_command = :stop_command, start_command = :start_command,
stop_user = :stop_user, start_user = :start_user,
is_cluster = :is_cluster, cluster_role = :cluster_role, cluster_note = :cluster_note,
is_db = :is_db, db_type = :db_type, db_note = :db_note,
is_middleware = :is_middleware, mw_type = :mw_type, mw_note = :mw_note,
has_agent_special = :has_agent_special, agent_note = :agent_note,
has_service_critical = :has_service_critical, service_note = :service_note,
needs_manual_step = :needs_manual_step, manual_step_detail = :manual_step_detail,
kernel_update_blocked = :kernel_update_blocked, kernel_block_reason = :kernel_block_reason,
reboot_min_interval_minutes = :reboot_min_interval, reboot_delay_minutes = :reboot_delay,
sentinel_disable_required = :sentinel, ip_forwarding_required = :ip_fwd,
rolling_update = :rolling, rolling_update_note = :rolling_note,
auto_restart = :auto_restart, patch_order_group = :pog,
extra_excludes = :extra_excludes, patch_excludes = :patch_excludes,
no_reboot_reason = :no_reboot, note = :note
WHERE id = :id
"""), {
"id": spec_id, "app_type": val("app_type"),
"reboot_order": ival("reboot_order"), "stop_order": ival("stop_order"), "reboot_order_note": val("reboot_order_note"),
"patch_wave": ival("patch_wave"), "patch_wave_group": val("patch_wave_group"),
"patch_wave_delay": ival("patch_wave_delay_days"), "patch_wave_note": val("patch_wave_note"),
"cmd_before_patch": val("cmd_before_patch"), "cmd_after_patch": val("cmd_after_patch"),
"cmd_before_reboot": val("cmd_before_reboot"), "cmd_after_reboot": val("cmd_after_reboot"),
"stop_command": val("stop_command"), "start_command": val("start_command"),
"stop_user": val("stop_user"), "start_user": val("start_user"),
"is_cluster": bval("is_cluster"), "cluster_role": val("cluster_role"), "cluster_note": val("cluster_note"),
"is_db": bval("is_db"), "db_type": val("db_type"), "db_note": val("db_note"),
"is_middleware": bval("is_middleware"), "mw_type": val("mw_type"), "mw_note": val("mw_note"),
"has_agent_special": bval("has_agent_special"), "agent_note": val("agent_note"),
"has_service_critical": bval("has_service_critical"), "service_note": val("service_note"),
"needs_manual_step": bval("needs_manual_step"), "manual_step_detail": val("manual_step_detail"),
"kernel_update_blocked": bval("kernel_update_blocked"), "kernel_block_reason": val("kernel_block_reason"),
"reboot_min_interval": ival("reboot_min_interval"), "reboot_delay": ival("reboot_delay"),
"sentinel": bval("sentinel"), "ip_fwd": bval("ip_fwd"),
"rolling": bval("rolling"), "rolling_note": val("rolling_note"),
"auto_restart": bval("auto_restart"), "pog": val("patch_order_group"),
"extra_excludes": val("extra_excludes"), "patch_excludes": val("patch_excludes"),
"no_reboot": val("no_reboot"), "note": val("note"),
})
db.commit()
# Recuperer le app_type pour rediriger vers le bon filtre + ancre
saved = db.execute(text("SELECT app_type FROM server_specifics WHERE id = :id"),
{"id": spec_id}).fetchone()
at = saved.app_type if saved and saved.app_type else ""
url = f"/specifics?msg=saved&app_type={at}#row-{spec_id}" if at else f"/specifics?msg=saved#row-{spec_id}"
return RedirectResponse(url=url, status_code=303)
@router.post("/specifics/add")
async def specific_add(request: Request, db=Depends(get_db),
hostname: str = Form(...), app_type: str = Form("")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
perms = get_user_perms(db, user)
if not can_edit(perms, "specifics"):
return RedirectResponse(url="/specifics")
row = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname) = LOWER(:h)"),
{"h": hostname.strip()}).fetchone()
if not row:
return RedirectResponse(url="/specifics?msg=not_found", status_code=303)
existing = db.execute(text("SELECT id FROM server_specifics WHERE server_id = :sid"),
{"sid": row.id}).fetchone()
if existing:
return RedirectResponse(url="/specifics?msg=exists", status_code=303)
db.execute(text(
"INSERT INTO server_specifics (server_id, app_type) VALUES (:sid, :at)"
), {"sid": row.id, "at": app_type or None})
db.commit()
return RedirectResponse(url="/specifics?msg=added", status_code=303)