Compare commits

...

11 Commits

12 changed files with 1463 additions and 256 deletions

View File

@ -6,7 +6,7 @@ from starlette.middleware.base import BaseHTTPMiddleware
from .config import APP_NAME, APP_VERSION
from .dependencies import get_current_user, get_user_perms
from .database import SessionLocal, SessionLocalDemo
from .routers import auth, dashboard, servers, settings, users, campaigns, planning, specifics, audit, contacts, qualys, qualys_tags, quickwin, referentiel, patching, applications
from .routers import auth, dashboard, servers, settings, users, campaigns, planning, specifics, audit, contacts, qualys, qualys_tags, quickwin, referentiel, patching, applications, patch_history
class PermissionsMiddleware(BaseHTTPMiddleware):
@ -64,6 +64,7 @@ app.include_router(qualys_tags.router)
app.include_router(quickwin.router)
app.include_router(referentiel.router)
app.include_router(patching.router)
app.include_router(patch_history.router)
app.include_router(applications.router)

View File

@ -1,4 +1,4 @@
"""Router Historique patching — vue de patch_history"""
"""Router Historique patching — vue unifiee patch_history + quickwin_entries"""
from fastapi import APIRouter, Request, Depends, Query
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
@ -12,27 +12,50 @@ templates = Jinja2Templates(directory="app/templates")
@router.get("/patching/historique", response_class=HTMLResponse)
async def patch_history_page(request: Request, db=Depends(get_db),
year: int = Query(None), week: int = Query(None),
hostname: str = Query(None), page: int = Query(1)):
year: str = Query(""), week: str = Query(""),
hostname: str = Query(""), source: str = Query(""),
os_family: str = Query(""), zone: str = Query(""),
domain: str = Query(""), intervenant: str = Query(""),
page: str = Query("1")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
from datetime import datetime
if not year:
year = datetime.now().year
year = int(year) if year and year.isdigit() else datetime.now().year
week = int(week) if week and week.isdigit() else None
page = int(page) if page and page.isdigit() else 1
hostname = hostname.strip() or None
source = source.strip() or None
os_family = os_family.strip() or None
zone = zone.strip() or None
domain = domain.strip() or None
intervenant = intervenant.strip() or None
per_page = 100
offset = (page - 1) * per_page
# KPIs
kpis = {}
kpis["total"] = db.execute(text(
kpis["total_ph"] = db.execute(text(
"SELECT COUNT(*) FROM patch_history WHERE EXTRACT(YEAR FROM date_patch)=:y"
), {"y": year}).scalar()
kpis["servers"] = db.execute(text(
"SELECT COUNT(DISTINCT server_id) FROM patch_history WHERE EXTRACT(YEAR FROM date_patch)=:y"
), {"y": year}).scalar()
kpis["total_qw"] = db.execute(text("""
SELECT COUNT(*) FROM quickwin_entries qe
JOIN quickwin_runs qr ON qe.run_id=qr.id
WHERE qe.status='patched' AND qr.year=:y
"""), {"y": year}).scalar()
kpis["total"] = kpis["total_ph"] + kpis["total_qw"]
kpis["servers"] = db.execute(text("""
SELECT COUNT(DISTINCT sid) FROM (
SELECT server_id AS sid FROM patch_history WHERE EXTRACT(YEAR FROM date_patch)=:y
UNION
SELECT qe.server_id FROM quickwin_entries qe
JOIN quickwin_runs qr ON qe.run_id=qr.id
WHERE qe.status='patched' AND qr.year=:y
) u
"""), {"y": year}).scalar()
kpis["patchables"] = db.execute(text(
"SELECT COUNT(*) FROM servers WHERE etat='Production' AND patch_os_owner='secops'"
)).scalar()
@ -41,56 +64,172 @@ async def patch_history_page(request: Request, db=Depends(get_db),
WHERE s.etat='Production' AND s.patch_os_owner='secops'
AND NOT EXISTS (SELECT 1 FROM patch_history ph
WHERE ph.server_id=s.id AND EXTRACT(YEAR FROM ph.date_patch)=:y)
AND NOT EXISTS (SELECT 1 FROM quickwin_entries qe
JOIN quickwin_runs qr ON qe.run_id=qr.id
WHERE qe.server_id=s.id AND qe.status='patched' AND qr.year=:y)
"""), {"y": year}).scalar()
kpis["coverage_pct"] = round((kpis["servers"] / kpis["patchables"] * 100), 1) if kpis["patchables"] else 0
# Par semaine
by_source = {}
by_source["import"] = db.execute(text(
"SELECT COUNT(*) FROM patch_history WHERE campaign_id IS NULL AND EXTRACT(YEAR FROM date_patch)=:y"
), {"y": year}).scalar()
by_source["standard"] = db.execute(text("""
SELECT COUNT(*) FROM patch_history ph
JOIN campaigns c ON ph.campaign_id=c.id
WHERE c.campaign_type='standard' AND EXTRACT(YEAR FROM ph.date_patch)=:y
"""), {"y": year}).scalar()
by_source["quickwin"] = kpis["total_qw"]
by_week = db.execute(text("""
SELECT TO_CHAR(date_patch, 'IW') as week_num,
COUNT(DISTINCT server_id) as servers
SELECT week_num, SUM(cnt)::int as servers FROM (
SELECT TO_CHAR(date_patch, 'IW') as week_num, COUNT(DISTINCT server_id) as cnt
FROM patch_history
WHERE EXTRACT(YEAR FROM date_patch)=:y
GROUP BY TO_CHAR(date_patch, 'IW')
ORDER BY week_num
UNION ALL
SELECT LPAD(qr.week_number::text, 2, '0') as week_num, COUNT(DISTINCT qe.server_id) as cnt
FROM quickwin_entries qe
JOIN quickwin_runs qr ON qe.run_id=qr.id
WHERE qe.status='patched' AND qr.year=:y
GROUP BY qr.week_number
) u GROUP BY week_num ORDER BY week_num
"""), {"y": year}).fetchall()
# Filtres
where = ["EXTRACT(YEAR FROM ph.date_patch)=:y"]
# Listes pour les filtres (selon annee courante)
filter_opts = {}
filter_opts["os"] = [r.os for r in db.execute(text("""
SELECT DISTINCT s.os_family as os FROM servers s
WHERE s.os_family IS NOT NULL AND s.os_family <> ''
ORDER BY 1
""")).fetchall()]
filter_opts["zones"] = [r.zone for r in db.execute(text("""
SELECT DISTINCT z.name as zone FROM zones z ORDER BY 1
""")).fetchall()]
filter_opts["domains"] = [r.dom for r in db.execute(text("""
SELECT DISTINCT d.name as dom FROM domains d ORDER BY 1
""")).fetchall()]
filter_opts["intervenants"] = [r.interv for r in db.execute(text("""
SELECT DISTINCT intervenant_name as interv FROM patch_history
WHERE intervenant_name IS NOT NULL AND intervenant_name <> ''
ORDER BY 1
""")).fetchall()]
where_ph = ["EXTRACT(YEAR FROM ph.date_patch)=:y"]
where_qw = ["qr.year=:y", "qe.status='patched'"]
params = {"y": year, "limit": per_page, "offset": offset}
if week:
where.append("EXTRACT(WEEK FROM ph.date_patch)=:wk")
where_ph.append("EXTRACT(WEEK FROM ph.date_patch)=:wk")
where_qw.append("qr.week_number=:wk")
params["wk"] = week
if hostname:
where.append("s.hostname ILIKE :h")
where_ph.append("s.hostname ILIKE :h")
where_qw.append("s.hostname ILIKE :h")
params["h"] = f"%{hostname}%"
wc = " AND ".join(where)
if os_family:
where_ph.append("s.os_family=:os")
where_qw.append("s.os_family=:os")
params["os"] = os_family
if zone:
where_ph.append("z.name=:zn")
where_qw.append("z.name=:zn")
params["zn"] = zone
if domain:
where_ph.append("d.name=:dm")
where_qw.append("d.name=:dm")
params["dm"] = domain
if intervenant:
where_ph.append("ph.intervenant_name=:iv")
where_qw.append("1=0") # quickwin n'a pas ce champ
params["iv"] = intervenant
if source == "import":
where_ph.append("ph.campaign_id IS NULL")
elif source == "standard":
where_ph.append("c.campaign_type='standard'")
total_filtered = db.execute(text(
f"SELECT COUNT(*) FROM patch_history ph JOIN servers s ON ph.server_id=s.id WHERE {wc}"
), params).scalar()
wc_ph = " AND ".join(where_ph)
wc_qw = " AND ".join(where_qw)
rows = db.execute(text(f"""
skip_qw = source in ("import", "standard") or bool(intervenant)
skip_ph = source == "quickwin"
ph_joins = """
JOIN servers s ON ph.server_id=s.id
LEFT JOIN zones z ON s.zone_id=z.id
LEFT JOIN domain_environments de ON s.domain_env_id=de.id
LEFT JOIN domains d ON de.domain_id=d.id
LEFT JOIN campaigns c ON ph.campaign_id=c.id
"""
qw_joins = """
JOIN quickwin_runs qr ON qe.run_id=qr.id
JOIN servers s ON qe.server_id=s.id
LEFT JOIN zones z ON s.zone_id=z.id
LEFT JOIN domain_environments de ON s.domain_env_id=de.id
LEFT JOIN domains d ON de.domain_id=d.id
"""
count_parts = []
if not skip_ph:
count_parts.append(f"SELECT COUNT(*) FROM patch_history ph {ph_joins} WHERE {wc_ph}")
if not skip_qw:
count_parts.append(f"SELECT COUNT(*) FROM quickwin_entries qe {qw_joins} WHERE {wc_qw}")
count_sql = " + ".join(f"({p})" for p in count_parts) if count_parts else "0"
total_filtered = db.execute(text(f"SELECT {count_sql}"), params).scalar()
union_parts = []
if not skip_ph:
union_parts.append(f"""
SELECT s.id as sid, s.hostname, s.os_family, s.etat,
ph.date_patch, ph.status, ph.notes,
z.name as zone
FROM patch_history ph
JOIN servers s ON ph.server_id = s.id
LEFT JOIN zones z ON s.zone_id = z.id
WHERE {wc}
ORDER BY ph.date_patch DESC
ph.date_patch, ph.status, ph.notes, ph.intervenant_name,
z.name as zone, d.name as domain_name,
CASE WHEN ph.campaign_id IS NULL THEN 'import'
ELSE COALESCE(c.campaign_type, 'standard') END as source_type,
c.id as campaign_id, c.label as campaign_label,
NULL::int as run_id, NULL::text as run_label
FROM patch_history ph {ph_joins}
WHERE {wc_ph}
""")
if not skip_qw:
union_parts.append(f"""
SELECT s.id as sid, s.hostname, s.os_family, s.etat,
qe.patch_date as date_patch, qe.status, qe.notes,
NULL::text as intervenant_name,
z.name as zone, d.name as domain_name,
'quickwin' as source_type,
NULL::int as campaign_id, NULL::text as campaign_label,
qr.id as run_id, qr.label as run_label
FROM quickwin_entries qe {qw_joins}
WHERE {wc_qw}
""")
if not union_parts:
union_parts.append("""SELECT NULL::int as sid, NULL as hostname, NULL as os_family, NULL as etat,
NULL::timestamptz as date_patch, NULL as status, NULL as notes, NULL as intervenant_name,
NULL as zone, NULL as domain_name, NULL as source_type,
NULL::int as campaign_id, NULL as campaign_label, NULL::int as run_id, NULL as run_label
WHERE 1=0""")
union_sql = " UNION ALL ".join(union_parts)
rows = db.execute(text(f"""
SELECT * FROM ({union_sql}) combined
ORDER BY date_patch DESC NULLS LAST
LIMIT :limit OFFSET :offset
"""), params).fetchall()
# Années dispo
years = db.execute(text("""
SELECT DISTINCT EXTRACT(YEAR FROM date_patch)::int as y
FROM patch_history ORDER BY y DESC
SELECT DISTINCT y FROM (
SELECT EXTRACT(YEAR FROM date_patch)::int as y FROM patch_history
UNION
SELECT year as y FROM quickwin_runs
) u ORDER BY y DESC
""")).fetchall()
return templates.TemplateResponse("patch_history.html", {
"request": request, "user": user, "app_name": APP_NAME,
"kpis": kpis, "by_week": by_week, "rows": rows,
"year": year, "week": week, "hostname": hostname,
"page": page, "per_page": per_page, "total_filtered": total_filtered,
"years": [y.y for y in years],
"kpis": kpis, "by_week": by_week, "by_source": by_source,
"rows": rows, "year": year, "week": week, "hostname": hostname,
"source": source, "os_family": os_family, "zone": zone,
"domain": domain, "intervenant": intervenant,
"filter_opts": filter_opts, "page": page, "per_page": per_page,
"total_filtered": total_filtered, "years": [y.y for y in years],
})

View File

@ -89,6 +89,7 @@
{% if p.campaigns %}<a href="/campaigns" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if 'campaigns' in path and 'assignments' not in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Campagnes</a>{% endif %}
{% if p.servers in ('edit','admin') or p.campaigns in ('edit','admin') or p.quickwin in ('edit','admin') %}<a href="/patching/config-exclusions" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if 'config-exclusions' in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Config exclusions</a>{% endif %}
{% if p.campaigns in ('edit','admin') or p.quickwin in ('edit','admin') %}<a href="/patching/validations" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if '/patching/validations' in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Validations</a>{% endif %}
<a href="/patching/historique" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if '/patching/historique' in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Historique</a>
{# Quickwin sous-groupe #}
{% if p.campaigns or p.quickwin %}

View File

@ -0,0 +1,160 @@
{% extends 'base.html' %}
{% block title %}Historique patching{% endblock %}
{% block content %}
<div class="flex justify-between items-center mb-4">
<div>
<h2 class="text-xl font-bold text-cyber-accent">Historique patching</h2>
<p class="text-xs text-gray-500 mt-1">Vue unifiée : imports xlsx + campagnes standard + QuickWin.</p>
</div>
<div class="flex gap-2">
{% for y in years %}<a href="?year={{ y }}" class="btn-sm {% if y == year %}bg-cyber-accent text-black{% else %}bg-cyber-border text-gray-300{% endif %} px-3 py-1 text-xs">{{ y }}</a>{% endfor %}
</div>
</div>
<!-- KPIs -->
<div style="display:flex;flex-wrap:wrap;gap:8px;margin-bottom:16px;">
<div class="card p-3 text-center" style="flex:1;min-width:0">
<div class="text-2xl font-bold text-cyber-accent">{{ kpis.total }}</div>
<div class="text-xs text-gray-500">Events {{ year }}</div>
</div>
<div class="card p-3 text-center" style="flex:1;min-width:0">
<div class="text-2xl font-bold text-cyber-green">{{ kpis.servers }}</div>
<div class="text-xs text-gray-500">Serveurs distincts</div>
</div>
<div class="card p-3 text-center" style="flex:1;min-width:0">
<div class="text-2xl font-bold text-white">{{ kpis.patchables }}</div>
<div class="text-xs text-gray-500">Patchables SecOps</div>
</div>
<div class="card p-3 text-center" style="flex:1;min-width:0">
<div class="text-2xl font-bold {% if kpis.never > 0 %}text-cyber-red{% else %}text-cyber-green{% endif %}">{{ kpis.never }}</div>
<div class="text-xs text-gray-500">Jamais patchés {{ year }}</div>
</div>
<div class="card p-3 text-center" style="flex:1;min-width:0">
<div class="text-2xl font-bold {% if kpis.coverage_pct >= 80 %}text-cyber-green{% elif kpis.coverage_pct >= 50 %}text-cyber-yellow{% else %}text-cyber-red{% endif %}">{{ kpis.coverage_pct }}%</div>
<div class="text-xs text-gray-500">Couverture</div>
</div>
</div>
<!-- Répartition par source -->
<div style="display:flex;flex-wrap:wrap;gap:8px;margin-bottom:16px;">
<a href="?year={{ year }}&source=import" class="card p-3 text-center hover:border-cyber-accent" style="flex:1;min-width:0">
<div class="text-2xl font-bold text-blue-400">{{ by_source.import }}</div>
<div class="text-xs text-gray-500">Import xlsx</div>
</a>
<a href="?year={{ year }}&source=standard" class="card p-3 text-center hover:border-cyber-accent" style="flex:1;min-width:0">
<div class="text-2xl font-bold text-cyan-400">{{ by_source.standard }}</div>
<div class="text-xs text-gray-500">Campagnes standard</div>
</a>
<a href="?year={{ year }}&source=quickwin" class="card p-3 text-center hover:border-cyber-accent" style="flex:1;min-width:0">
<div class="text-2xl font-bold text-purple-400">{{ by_source.quickwin }}</div>
<div class="text-xs text-gray-500">QuickWin</div>
</a>
</div>
<!-- Graphique par semaine -->
{% if by_week %}
<div class="card p-4 mb-4">
<h3 class="text-sm font-bold text-cyber-accent mb-3">Serveurs patchés par semaine ({{ year }})</h3>
<div style="display:flex;align-items:flex-end;gap:2px;height:120px;">
{% set max_val = by_week|map(attribute='servers')|max %}
{% for w in by_week %}
<a href="?year={{ year }}&week={{ w.week_num|int }}" title="S{{ w.week_num }} : {{ w.servers }} serveur(s)" style="flex:1;display:flex;flex-direction:column;align-items:center;min-width:0;">
<div style="width:100%;background:{% if week and week == w.week_num|int %}#00ff88{% elif w.servers >= 30 %}#06b6d4{% elif w.servers >= 15 %}#0e7490{% else %}#164e63{% endif %};border-radius:2px 2px 0 0;height:{{ (w.servers / max_val * 100)|int if max_val else 0 }}px;min-height:2px;"></div>
<span style="font-size:8px;color:#6b7280;margin-top:2px;">{{ w.week_num }}</span>
</a>
{% endfor %}
</div>
</div>
{% endif %}
<!-- Filtres -->
<div class="card p-3 mb-4">
<form method="GET" class="flex gap-2 items-center flex-wrap">
<input type="hidden" name="year" value="{{ year }}">
<select name="week" class="text-xs py-1 px-2">
<option value="">Toutes semaines</option>
{% for w in by_week %}<option value="{{ w.week_num|int }}" {% if week == w.week_num|int %}selected{% endif %}>S{{ w.week_num }} ({{ w.servers }})</option>{% endfor %}
</select>
<select name="source" class="text-xs py-1 px-2">
<option value="">Toutes sources</option>
<option value="import" {% if source == 'import' %}selected{% endif %}>Import xlsx</option>
<option value="standard" {% if source == 'standard' %}selected{% endif %}>Campagne std</option>
<option value="quickwin" {% if source == 'quickwin' %}selected{% endif %}>QuickWin</option>
</select>
<select name="os_family" class="text-xs py-1 px-2">
<option value="">Tous OS</option>
{% for o in filter_opts.os %}<option value="{{ o }}" {% if os_family == o %}selected{% endif %}>{{ o }}</option>{% endfor %}
</select>
<select name="zone" class="text-xs py-1 px-2">
<option value="">Toutes zones</option>
{% for z in filter_opts.zones %}<option value="{{ z }}" {% if zone == z %}selected{% endif %}>{{ z }}</option>{% endfor %}
</select>
<select name="domain" class="text-xs py-1 px-2">
<option value="">Tous domaines</option>
{% for d in filter_opts.domains %}<option value="{{ d }}" {% if domain == d %}selected{% endif %}>{{ d }}</option>{% endfor %}
</select>
<select name="intervenant" class="text-xs py-1 px-2">
<option value="">Tous intervenants</option>
{% for i in filter_opts.intervenants %}<option value="{{ i }}" {% if intervenant == i %}selected{% endif %}>{{ i }}</option>{% endfor %}
</select>
<input type="text" name="hostname" value="{{ hostname or '' }}" placeholder="Hostname..." class="text-xs py-1 px-2" style="width:140px">
<button type="submit" class="btn-primary px-3 py-1 text-xs">Filtrer</button>
<a href="/patching/historique?year={{ year }}" class="text-xs text-gray-500 hover:text-cyber-accent">Reset</a>
<span class="text-xs text-gray-500 ml-auto">{{ total_filtered }} résultat{{ 's' if total_filtered != 1 }}</span>
</form>
</div>
<!-- Tableau -->
<div class="card overflow-x-auto">
<table class="w-full table-cyber text-xs">
<thead><tr>
<th class="p-2 text-left">Hostname</th>
<th class="p-2 text-center">OS</th>
<th class="p-2 text-center">Zone</th>
<th class="p-2 text-center">Domaine</th>
<th class="p-2 text-center">État</th>
<th class="p-2 text-center">Date</th>
<th class="p-2 text-center">Sem.</th>
<th class="p-2 text-center">Intervenant</th>
<th class="p-2 text-center">Source</th>
<th class="p-2 text-center">Status</th>
<th class="p-2 text-left">Notes</th>
</tr></thead>
<tbody>
{% for r in rows %}
<tr class="border-t border-cyber-border/30 hover:bg-cyber-hover/20">
<td class="p-2 font-mono text-cyber-accent"><a href="/servers/{{ r.sid }}" class="hover:underline">{{ r.hostname }}</a></td>
<td class="p-2 text-center text-gray-400">{{ (r.os_family or '-')[:6] }}</td>
<td class="p-2 text-center"><span class="badge {% if r.zone == 'DMZ' %}badge-red{% else %}badge-gray{% endif %}">{{ r.zone or '-' }}</span></td>
<td class="p-2 text-center text-gray-300">{{ (r.domain_name or '-')[:10] }}</td>
<td class="p-2 text-center"><span class="badge {% if r.etat == 'Production' %}badge-green{% else %}badge-yellow{% endif %}">{{ (r.etat or '-')[:6] }}</span></td>
<td class="p-2 text-center text-gray-300">{{ r.date_patch.strftime('%Y-%m-%d %H:%M') if r.date_patch else '-' }}</td>
<td class="p-2 text-center text-gray-400">{% if r.date_patch %}S{{ r.date_patch.strftime('%V') }}{% else %}-{% endif %}</td>
<td class="p-2 text-center text-gray-300">{{ r.intervenant_name or '-' }}</td>
<td class="p-2 text-center">
{% if r.source_type == 'import' %}<span class="badge" style="background:#1e3a5f;color:#60a5fa;">xlsx</span>
{% elif r.source_type == 'standard' %}<a href="/campaigns/{{ r.campaign_id }}" class="badge" style="background:#164e63;color:#22d3ee;text-decoration:none">{{ r.campaign_label or 'Campagne' }}</a>
{% elif r.source_type == 'quickwin' %}<a href="/quickwin/{{ r.run_id }}" class="badge" style="background:#3b1f5e;color:#c084fc;text-decoration:none">{{ r.run_label or 'QuickWin' }}</a>
{% else %}<span class="badge badge-gray">{{ r.source_type or '?' }}</span>{% endif %}
</td>
<td class="p-2 text-center"><span class="badge {% if r.status == 'ok' or r.status == 'patched' %}badge-green{% elif r.status == 'ko' or r.status == 'failed' %}badge-red{% else %}badge-yellow{% endif %}">{{ r.status }}</span></td>
<td class="p-2 text-gray-400" style="max-width:180px;overflow:hidden;text-overflow:ellipsis;white-space:nowrap" title="{{ r.notes or '' }}">{{ (r.notes or '-')[:40] }}</td>
</tr>
{% endfor %}
{% if not rows %}
<tr><td colspan="11" class="p-6 text-center text-gray-500">Aucun event de patching pour ce filtre</td></tr>
{% endif %}
</tbody>
</table>
</div>
<!-- Pagination -->
{% if total_filtered > per_page %}
<div class="flex justify-center gap-2 mt-4">
{% set qs = 'year=' ~ year ~ ('&week=' ~ week if week else '') ~ ('&source=' ~ source if source else '') ~ ('&os_family=' ~ os_family if os_family else '') ~ ('&zone=' ~ zone if zone else '') ~ ('&domain=' ~ domain if domain else '') ~ ('&intervenant=' ~ intervenant if intervenant else '') ~ ('&hostname=' ~ hostname if hostname else '') %}
{% if page > 1 %}<a href="?{{ qs }}&page={{ page - 1 }}" class="btn-sm bg-cyber-border text-gray-300 px-3 py-1 text-xs">← Précédent</a>{% endif %}
<span class="text-xs text-gray-500 py-1">Page {{ page }} / {{ ((total_filtered - 1) // per_page) + 1 }}</span>
{% if page * per_page < total_filtered %}<a href="?{{ qs }}&page={{ page + 1 }}" class="btn-sm bg-cyber-border text-gray-300 px-3 py-1 text-xs">Suivant →</a>{% endif %}
</div>
{% endif %}
{% endblock %}

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,10 @@
-- Migration 2026-04-17 : ajout colonne intervenant_name a patch_history
-- pour stocker le nom d'intervenant libre provenant du xlsx (ex "Khalid", "Thierno")
-- sans FK users (car ne correspond pas forcement a un user patchcenter)
BEGIN;
ALTER TABLE patch_history ADD COLUMN IF NOT EXISTS intervenant_name varchar(100);
CREATE INDEX IF NOT EXISTS idx_ph_intervenant_name ON patch_history (intervenant_name);
COMMIT;

View File

@ -0,0 +1,44 @@
-- Migration 2026-04-17 : lier users ↔ contacts ↔ LDAP proprement (FK + index)
--
-- Avant : users.itop_person_id (int) pointe vers iTop (pas vers contacts.id)
-- -> lien indirect fragile entre users et contacts via itop_id
--
-- Apres : users.contact_id (FK propre vers contacts.id)
-- contacts.ldap_dn (trace la source AD quand le contact vient d'un import LDAP)
-- Les 3 tables sont jointes directement : users.contact_id = contacts.id
-- La source LDAP est identifiee par contacts.ldap_dn IS NOT NULL et/ou
-- users.auth_type = 'ldap'.
BEGIN;
-- 1. users.contact_id : FK vers contacts.id
ALTER TABLE users ADD COLUMN IF NOT EXISTS contact_id INTEGER;
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint WHERE conname = 'users_contact_id_fkey'
) THEN
ALTER TABLE users ADD CONSTRAINT users_contact_id_fkey
FOREIGN KEY (contact_id) REFERENCES contacts(id) ON DELETE SET NULL;
END IF;
END$$;
CREATE INDEX IF NOT EXISTS idx_users_contact_id ON users (contact_id);
-- 2. contacts.ldap_dn : trace provenance AD
ALTER TABLE contacts ADD COLUMN IF NOT EXISTS ldap_dn varchar(500);
CREATE INDEX IF NOT EXISTS idx_contacts_ldap_dn ON contacts (ldap_dn)
WHERE ldap_dn IS NOT NULL;
-- 3. Backfill users.contact_id depuis users.email <-> contacts.email
-- (pour les users deja presents dont l'email matche un contact)
UPDATE users u
SET contact_id = c.id
FROM contacts c
WHERE u.contact_id IS NULL
AND u.email IS NOT NULL
AND lower(u.email) = lower(c.email);
COMMENT ON COLUMN users.contact_id IS 'FK vers contacts.id — lien direct user ↔ contact (le meme email)';
COMMENT ON COLUMN contacts.ldap_dn IS 'DN AD d''ou provient ce contact (import LDAP). NULL si import iTop ou saisie manuelle';
COMMIT;

View File

@ -0,0 +1,223 @@
"""Import des membres d'un groupe AD vers la table users + lien avec contacts.
3 champs lies :
1. LDAP/AD (source : groupe AD specifique, ex. CN=secops,...)
2. contacts (par email : match existant, creation si absent)
3. users (par username=sAMAccountName, auth_type='ldap')
+ users.itop_person_id = contacts.itop_id (si contact matche)
Par defaut le groupe AD cible est :
CN=secops,OU=Groupes d administration,OU=Administration,DC=sanef,DC=groupe
La config LDAP (serveur, bind DN, bind pass, base DN) est lue depuis app_secrets
via ldap_service.
Usage (doit tourner depuis le poste SANEF car l'AD n'est pas joignable du lab) :
python tools/import_ldap_group_users.py
python tools/import_ldap_group_users.py --group "CN=secops,OU=...,DC=sanef,DC=groupe" --dry-run
"""
import os
import sys
import argparse
from pathlib import Path
from sqlalchemy import create_engine, text
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
or os.getenv("DATABASE_URL")
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
DEFAULT_GROUP_DN = "CN=secops,OU=Groupes d administration,OU=Administration,DC=sanef,DC=groupe"
def get_ldap_config(engine):
"""Recupere la config LDAP depuis app_secrets (reutilise ldap_service)."""
from app.services.ldap_service import _get_config
with engine.connect() as conn:
return _get_config(conn)
def fetch_group_members(cfg, group_dn):
"""Retourne liste de dicts {username, name, email, dn}.
Strategie : bind service account -> search pour user dont memberOf contient group_dn.
Plus fiable que de lire group.member (limite 1500 DN par defaut).
"""
from ldap3 import Server, Connection, ALL, SUBTREE
use_ssl = cfg["server"].startswith("ldaps://")
server = Server(cfg["server"], get_info=ALL, use_ssl=use_ssl)
conn = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"],
auto_bind=True)
# Filter LDAP : membre direct du groupe (inclut comptes admin, meme sans mail)
search_filter = (
f"(&(objectClass=user)(objectCategory=person)"
f"(memberOf={group_dn}))"
)
conn.search(cfg["base_dn"], search_filter, search_scope=SUBTREE,
attributes=["sAMAccountName", "displayName", "mail",
"userPrincipalName", "distinguishedName",
"userAccountControl"])
members = []
for entry in conn.entries:
sam = str(entry.sAMAccountName) if entry.sAMAccountName else None
if not sam:
print(f" [SKIP] Entry sans sAMAccountName : {entry.entry_dn}")
continue
# Priorite email : mail > userPrincipalName > fallback sam@sanef.com
email = None
if entry.mail and str(entry.mail).strip():
email = str(entry.mail).strip().lower()
elif entry.userPrincipalName and str(entry.userPrincipalName).strip():
email = str(entry.userPrincipalName).strip().lower()
else:
email = f"{sam.lower()}@sanef.com"
print(f" [INFO] {sam} sans mail AD, fallback : {email}")
# Verifier si compte desactive (pour info seulement)
uac = entry.userAccountControl.value if entry.userAccountControl else 0
if isinstance(uac, int) and uac & 0x2:
print(f" [WARN] {sam} compte AD DESACTIVE (UAC={uac}) — importe quand meme")
members.append({
"username": sam.lower(),
"display_name": str(entry.displayName) if entry.displayName else sam,
"email": email,
"dn": str(entry.entry_dn),
})
conn.unbind()
return members
SQL_FIND_CONTACT = text("""
SELECT id, itop_id FROM contacts WHERE lower(email) = :email LIMIT 1
""")
SQL_INSERT_CONTACT = text("""
INSERT INTO contacts (name, email, role, team, ldap_dn,
is_active, is_verified, created_at, updated_at)
VALUES (:name, :email, 'contact_technique', 'SecOps', :ldap_dn,
true, true, now(), now())
ON CONFLICT (email) DO UPDATE SET
name = EXCLUDED.name,
ldap_dn = EXCLUDED.ldap_dn,
updated_at = now()
RETURNING id, itop_id
""")
SQL_FIND_USER = text("""
SELECT id FROM users WHERE username = :username LIMIT 1
""")
SQL_INSERT_USER = text("""
INSERT INTO users (username, display_name, email, role, auth_type,
is_active, contact_id, itop_person_id,
created_at, updated_at)
VALUES (:username, :display_name, :email, 'operator', 'ldap',
true, :contact_id, :itop_pid,
now(), now())
""")
SQL_UPDATE_USER = text("""
UPDATE users SET
display_name = :display_name,
email = :email,
auth_type = 'ldap',
-- is_active PRESERVE : jamais reactive un user desactive manuellement
contact_id = COALESCE(:contact_id, contact_id),
itop_person_id = COALESCE(:itop_pid, itop_person_id),
updated_at = now()
WHERE id = :uid
""")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--group", default=DEFAULT_GROUP_DN,
help=f"DN du groupe AD (defaut: {DEFAULT_GROUP_DN})")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
print(f"[INFO] Groupe cible : {args.group}")
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
cfg = get_ldap_config(engine)
if not cfg["enabled"]:
print("[ERR] LDAP desactive dans app_secrets (ldap_enabled != true).")
sys.exit(1)
if not cfg["server"] or not cfg["base_dn"]:
print("[ERR] LDAP non configure (server ou base_dn manquant).")
sys.exit(1)
print(f"[INFO] LDAP server : {cfg['server']} base_dn : {cfg['base_dn']}")
try:
members = fetch_group_members(cfg, args.group)
except Exception as e:
print(f"[ERR] LDAP search failed : {e}")
sys.exit(2)
print(f"[INFO] Membres AD retrouves : {len(members)}")
for m in members[:5]:
print(f" {m['username']:20s} {m['email']:40s} {m['display_name']}")
if len(members) > 5:
print(f" ... ({len(members) - 5} autres)")
if args.dry_run:
print("[DRY-RUN] Aucun write")
return
inserted_u = updated_u = created_c = linked_itop = linked_contact = 0
with engine.begin() as conn:
for m in members:
# 1. Contact : find or create (stocke ldap_dn pour tracer source AD)
row = conn.execute(SQL_FIND_CONTACT, {"email": m["email"]}).fetchone()
if row:
contact_id, itop_id = row
# Mettre a jour ldap_dn si contact deja existant sans la trace
conn.execute(text(
"UPDATE contacts SET ldap_dn = COALESCE(ldap_dn, :dn) WHERE id = :cid"
), {"dn": m["dn"], "cid": contact_id})
else:
r = conn.execute(SQL_INSERT_CONTACT, {
"name": m["display_name"],
"email": m["email"],
"ldap_dn": m["dn"],
}).fetchone()
contact_id, itop_id = r
created_c += 1
# 2. User : upsert + lien contact_id (FK) + itop_person_id (historique)
u = conn.execute(SQL_FIND_USER, {"username": m["username"]}).fetchone()
params = {
"username": m["username"],
"display_name": m["display_name"],
"email": m["email"],
"contact_id": contact_id,
"itop_pid": itop_id, # None si contact sans itop_id
}
if u:
conn.execute(SQL_UPDATE_USER, {**params, "uid": u[0]})
updated_u += 1
else:
conn.execute(SQL_INSERT_USER, params)
inserted_u += 1
linked_contact += 1
if itop_id:
linked_itop += 1
print(f"[OK] Termine :")
print(f" users : INSERT {inserted_u} UPDATE {updated_u}")
print(f" contacts : CREATE {created_c}")
print(f" links : users.contact_id = {linked_contact}")
print(f" users.itop_person_id = {linked_itop}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,275 @@
"""Import historique patching depuis Plan de Patching serveurs 2026.xlsx (SOURCE DE VERITE).
Perimetre : 2025 + 2026 uniquement.
- Histo-2025 (cols L/M = 1er sem, O/P = 2eme sem)
- S02..S52 (weekly 2026 : nom de cellule VERT = patche)
Regles :
- Weekly sheets : cellule du nom (col A) AVEC FOND VERT = serveur patche
- Date : col N (14) ; Heure : col O (15)
- Si date manque -> lundi de la semaine (ISO) ; si heure manque -> 00:00
- La semaine est toujours derivee du nom de sheet (S02..S52) ou de date_patch
Usage :
python tools/import_plan_patching_xlsx.py [xlsx] [--truncate] [--dry-run]
"""
import os
import re
import sys
import glob
import argparse
from datetime import datetime, time, date, timedelta
from pathlib import Path
import openpyxl
from sqlalchemy import create_engine, text
ROOT = Path(__file__).resolve().parent.parent
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
or os.getenv("DATABASE_URL")
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
# Normalisation des noms d'intervenants (xlsx libre -> canonique)
INTERVENANT_MAP = {
"sophie/joel": "Joel",
"joel/sophie": "Joel",
}
def normalize_intervenant(name):
if not name:
return None
s = str(name).strip()
return INTERVENANT_MAP.get(s.lower(), s)
def is_green(cell):
"""True si la cellule a un fond vert (dominante G > R et G > B)."""
if cell.fill is None or cell.fill.fgColor is None:
return False
fc = cell.fill.fgColor
rgb = None
if fc.type == "rgb" and fc.rgb:
rgb = fc.rgb.upper()
elif fc.type == "theme":
# Themes Office 9/6 = green-ish accents
return fc.theme in (9, 6)
if not rgb or len(rgb) < 6:
return False
try:
rr = int(rgb[-6:-4], 16)
gg = int(rgb[-4:-2], 16)
bb = int(rgb[-2:], 16)
except ValueError:
return False
return gg > 120 and gg > rr + 30 and gg > bb + 30
def parse_week_num(sheet_name):
m = re.match(r"^[Ss](\d{1,2})$", sheet_name.strip())
return int(m.group(1)) if m else None
def monday_of_iso_week(year, week):
jan4 = date(year, 1, 4)
start = jan4 - timedelta(days=jan4.isoweekday() - 1) + timedelta(weeks=week - 1)
return start
def parse_hour(val):
if val is None:
return None
if isinstance(val, time):
return val
if isinstance(val, datetime):
return val.time()
s = str(val).strip().lower().replace("h", ":")
m = re.match(r"(\d{1,2})(?::(\d{2}))?", s)
if not m:
return None
hh = int(m.group(1))
mm = int(m.group(2) or 0)
if 0 <= hh < 24 and 0 <= mm < 60:
return time(hh, mm)
return None
def parse_date_cell(val):
if val is None:
return None
if isinstance(val, datetime):
return val
if isinstance(val, date):
return datetime.combine(val, time(0, 0))
s = str(val).strip()
m = re.match(r"(\d{2})/(\d{2})/(\d{4})", s)
if m:
try:
return datetime(int(m.group(3)), int(m.group(2)), int(m.group(1)))
except Exception:
return None
return None
def find_xlsx():
for p in [
ROOT / "deploy" / "Plan de Patching serveurs 2026.xlsx",
ROOT / "deploy" / "Plan_de_Patching_serveurs_2026.xlsx",
]:
if p.exists():
return str(p)
hits = glob.glob(str(ROOT / "deploy" / "Plan*Patching*erveurs*2026*.xlsx"))
return hits[0] if hits else None
def collect_events(wb, hosts):
"""Retourne liste dicts patch_history : {sid, dt, status, notes}.
3 champs toujours renseignes : semaine (dans notes), date (date_patch::date),
heure (date_patch::time 00:00 si inconnue).
"""
events = []
stats = {"histo_2025_s1": 0, "histo_2025_s2": 0,
"weekly": 0, "no_server": 0, "weekly_no_color": 0}
# --- Histo-2025 : col B (2) Intervenant, col L (12) date S1, col M (13) flag S1, col O (15) date S2, col P (16) flag S2
if "Histo-2025" in wb.sheetnames:
ws = wb["Histo-2025"]
for row_idx in range(2, ws.max_row + 1):
hn = ws.cell(row=row_idx, column=1).value
if not hn:
continue
sid = hosts.get(str(hn).strip().lower())
if not sid:
stats["no_server"] += 1
continue
interv = ws.cell(row=row_idx, column=2).value
interv = str(interv).strip() if interv else None
date_s1 = parse_date_cell(ws.cell(row=row_idx, column=12).value)
flag_s1 = ws.cell(row=row_idx, column=13).value
if flag_s1 and isinstance(flag_s1, int) and flag_s1 >= 1:
dt = date_s1 or datetime(2025, 6, 30, 0, 0)
events.append({"sid": sid, "dt": dt, "status": "ok",
"notes": f"Histo-2025 S1 (x{flag_s1})",
"interv": interv})
stats["histo_2025_s1"] += 1
date_s2 = parse_date_cell(ws.cell(row=row_idx, column=15).value)
flag_s2 = ws.cell(row=row_idx, column=16).value
if flag_s2 and isinstance(flag_s2, int) and flag_s2 >= 1:
dt = date_s2 or datetime(2025, 12, 31, 0, 0)
events.append({"sid": sid, "dt": dt, "status": "ok",
"notes": f"Histo-2025 S2 (x{flag_s2})",
"interv": interv})
stats["histo_2025_s2"] += 1
# --- Weekly sheets S02..S52 : nom colore VERT = patche (2026)
for sname in wb.sheetnames:
wk = parse_week_num(sname)
if wk is None or not (1 <= wk <= 53):
continue
ws = wb[sname]
fallback_monday = monday_of_iso_week(2026, wk)
for row_idx in range(2, ws.max_row + 1):
hn_cell = ws.cell(row=row_idx, column=1)
hn = hn_cell.value
if not hn or not any(c.isalpha() for c in str(hn)):
continue
if not is_green(hn_cell):
stats["weekly_no_color"] += 1
continue
hn_norm = str(hn).strip().split(".")[0].lower()
sid = hosts.get(hn_norm)
if not sid:
stats["no_server"] += 1
continue
interv = ws.cell(row=row_idx, column=2).value
interv = str(interv).strip() if interv else None
# col N (14) = Date, col O (15) = Heure
date_val = ws.cell(row=row_idx, column=14).value
hour_val = ws.cell(row=row_idx, column=15).value
dt_base = parse_date_cell(date_val) or datetime.combine(fallback_monday, time(0, 0))
hr = parse_hour(hour_val)
if hr:
dt_base = datetime.combine(dt_base.date(), hr)
# sinon : heure = 00:00 par defaut (deja dans dt_base)
# Skip si date de patch dans le futur (cellule coloree en avance)
if dt_base > datetime.now():
stats["weekly_future"] = stats.get("weekly_future", 0) + 1
continue
events.append({"sid": sid, "dt": dt_base, "status": "ok",
"notes": f"Semaine {wk:02d} 2026",
"interv": interv})
stats["weekly"] += 1
return events, stats
def main():
parser = argparse.ArgumentParser()
parser.add_argument("xlsx", nargs="?", default=None)
parser.add_argument("--truncate", action="store_true",
help="TRUNCATE patch_history avant import (source de verite)")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
xlsx = args.xlsx or find_xlsx()
if not xlsx or not os.path.exists(xlsx):
print("[ERR] Fichier Plan de Patching introuvable. Place-le dans deploy/.")
sys.exit(1)
print(f"[INFO] Fichier: {xlsx}")
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
wb = openpyxl.load_workbook(xlsx, data_only=True)
print(f"[INFO] Sheets: {', '.join(wb.sheetnames)}")
with engine.begin() as conn:
hosts = {}
for r in conn.execute(text("SELECT id, hostname FROM servers")).fetchall():
hosts[r.hostname.lower()] = r.id
print(f"[INFO] Servers en DB: {len(hosts)}")
events, stats = collect_events(wb, hosts)
print("[INFO] Events detectes:")
for k, v in stats.items():
print(f" {v:5d} {k}")
print(f"[INFO] TOTAL events: {len(events)}")
if args.dry_run:
print("[DRY-RUN] Aucun write")
return
if args.truncate:
print("[INFO] TRUNCATE patch_history RESTART IDENTITY CASCADE")
conn.execute(text("TRUNCATE TABLE patch_history RESTART IDENTITY CASCADE"))
inserted = skipped = 0
for ev in events:
existing = conn.execute(text(
"SELECT id FROM patch_history WHERE server_id=:sid AND date_patch=:dt"
), {"sid": ev["sid"], "dt": ev["dt"]}).fetchone()
if existing:
skipped += 1
continue
conn.execute(text("""
INSERT INTO patch_history (server_id, date_patch, status, notes, intervenant_name)
VALUES (:sid, :dt, :status, :notes, :interv)
"""), ev)
inserted += 1
print(f"[OK] INSERT: {inserted} | SKIP (doublon): {skipped}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,213 @@
"""Import planning annuel patching depuis Planning Patching 2026_ayoub.xlsx feuille Planning.
Mapping colonnes feuille Planning :
A : domaine+env (ex Infrastructure HPROD, Peage PROD, FL Prod)
B : Patch N marker (cycle) OU semaine NN (ligne data)
C : plage dates DD/MM/YYYY ... DD/MM/YYYY OU Gel
D : ferie (datetime) OU Gel OU texte
Structure cible table patch_planning :
year, week_number, week_code, week_start, week_end, cycle,
domain_code (FK domains), env_scope, status, note
Usage :
python tools/import_planning_xlsx.py [chemin_fichier.xlsx]
"""
import os
import sys
import re
import glob
from pathlib import Path
from datetime import date, datetime, timedelta
import openpyxl
from sqlalchemy import create_engine, text
ROOT = Path(__file__).resolve().parent.parent
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
or os.getenv("DATABASE_URL")
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
def parse_label(a):
"""Retourne liste (domain_code, env_scope) pour le libelle col A.
Un libelle peut mapper sur plusieurs domaines (ex BI + Gestion) ou
un scope combine (prod_pilot pour Peage HPROD / PROD Pilote).
"""
if not a:
return []
lo = a.lower()
if "bi" in lo and "gestion" in lo:
return [("BI", "all"), ("GESTION", "all")]
if "peage" in lo or "p\xe9age" in lo:
if "pilot" in lo:
return [("PEA", "prod_pilot")]
if "hprod" in lo:
return [("PEA", "hprod")]
if "prod" in lo:
return [("PEA", "prod")]
if "infrastructure" in lo:
if "hprod" in lo:
return [("INFRASTRUC", "hprod")]
return [("INFRASTRUC", "prod")]
if "trafic" in lo:
if "hprod" in lo:
return [("trafic", "hprod")]
return [("trafic", "prod")]
if lo.startswith("fl"):
if "pre-prod" in lo or "pr\xe9-prod" in lo or "preprod" in lo or "pr\xe9prod" in lo:
return [("FL", "pilot")]
if "prod" in lo and "pre" not in lo and "pr\xe9" not in lo:
return [("FL", "prod")]
return [("FL", "hprod")]
return []
def parse_dates(c_val, year):
"""Parse col C. Retourne (week_start, week_end, is_freeze)."""
if not c_val:
return None, None, False
s = str(c_val).strip()
if s.lower() == "gel":
return None, None, True
m = re.search(r"(\d{2})/(\d{2})/(\d{4}).*?(\d{2})/(\d{2})/(\d{4})", s)
if m:
d1 = date(int(m.group(3)), int(m.group(2)), int(m.group(1)))
d2 = date(int(m.group(6)), int(m.group(5)), int(m.group(4)))
return d1, d2, False
return None, None, False
def iso_week_dates(year, week):
"""Fallback : dates debut/fin semaine ISO depuis year+week."""
jan4 = date(year, 1, 4)
start = jan4 - timedelta(days=jan4.isoweekday() - 1) + timedelta(weeks=week - 1)
return start, start + timedelta(days=6)
def parse_note(d_val):
if d_val is None:
return None
if isinstance(d_val, (datetime, date)):
dd = d_val.date() if isinstance(d_val, datetime) else d_val
return f"Ferie : {dd.strftime('%d/%m/%Y')}"
s = str(d_val).strip()
if not s or s.lower() == "gel":
return None
return s
def parse_planning(xlsx_path, year_default=2026):
wb = openpyxl.load_workbook(xlsx_path, data_only=True)
if "Planning" not in wb.sheetnames:
raise SystemExit(f"[ERR] Sheet Planning introuvable. Sheets: {wb.sheetnames}")
ws = wb["Planning"]
rows = []
current_cycle = None
for row in ws.iter_rows(values_only=True):
a = row[0] if len(row) > 0 else None
b = row[1] if len(row) > 1 else None
c = row[2] if len(row) > 2 else None
d = row[3] if len(row) > 3 else None
if b and re.match(r"^\s*Patch\s+\d+\s*$", str(b), re.I):
m = re.search(r"\d+", str(b))
current_cycle = int(m.group(0)) if m else None
continue
if not b:
continue
m = re.match(r"^\s*semaine\s+(\d+)\s*$", str(b), re.I)
if not m:
continue
week_number = int(m.group(1))
year = year_default
week_code = f"S{week_number:02d}"
d1, d2, is_freeze = parse_dates(c, year)
if not d1:
d1, d2 = iso_week_dates(year, week_number)
note = parse_note(d)
if is_freeze and note is None:
note = "Gel"
if is_freeze:
status = "freeze"
else:
status = "open" if a else "empty"
targets = parse_label(a)
if not targets:
targets = [(None, "all")]
for dom, env in targets:
rows.append({
"year": year,
"week_number": week_number,
"week_code": week_code,
"week_start": d1,
"week_end": d2,
"cycle": current_cycle,
"domain_code": dom,
"env_scope": env,
"status": status,
"note": note,
})
return rows
SQL_INSERT = text("""
INSERT INTO patch_planning
(year, week_number, week_code, week_start, week_end, cycle,
domain_code, env_scope, status, note)
VALUES
(:year, :week_number, :week_code, :week_start, :week_end, :cycle,
:domain_code, :env_scope, :status, :note)
""")
def main():
if len(sys.argv) > 1:
xlsx = sys.argv[1]
else:
xlsx = None
for p in [
ROOT / "deploy" / "Planning Patching 2026_ayoub.xlsx",
ROOT / "deploy" / "Planning_Patching_2026_ayoub.xlsx",
]:
if p.exists():
xlsx = str(p)
break
if not xlsx:
candidates = glob.glob(str(ROOT / "deploy" / "*Planning*ayoub*.xlsx"))
xlsx = candidates[0] if candidates else None
if not xlsx or not os.path.exists(xlsx):
print("[ERR] Fichier Planning introuvable. Place-le dans deploy/ (ex: deploy/Planning Patching 2026_ayoub.xlsx)")
sys.exit(1)
print(f"[INFO] Fichier: {xlsx}")
rows = parse_planning(xlsx)
print(f"[INFO] Lignes parses: {len(rows)}")
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
inserted = 0
with engine.begin() as conn:
for r in rows:
conn.execute(SQL_INSERT, r)
inserted += 1
print(f"[OK] Termine - INSERT: {inserted}")
print("[INFO] Verifs :")
print(" SELECT week_code, domain_code, env_scope, status FROM patch_planning ORDER BY year, week_number, domain_code;")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,141 @@
"""Verifie + etablit les 3 liens : patch_history <-> users <-> contacts.
Contexte :
- patch_history.intervenant_name : texte libre venant du xlsx (ex "Khalid", "Mouaad")
- users.id : FK cible pour patch_history.intervenant_id
- users.contact_id : FK vers contacts.id
- contacts.ldap_dn : trace source AD
Matching : on tente d'apparier patch_history.intervenant_name a users.display_name
(ex "Khalid" -> "MOUTAOUAKIL-ext Khalid (admin)") en cherchant le prenom comme token.
Usage :
python tools/link_patch_history_intervenants.py # verif seule
python tools/link_patch_history_intervenants.py --apply # UPDATE FK
"""
import os
import sys
import argparse
from sqlalchemy import create_engine, text
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
or os.getenv("DATABASE_URL")
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
def report_state(conn):
print("\n=== ETAT ACTUEL DES 3 TABLES ===")
r = conn.execute(text("""
SELECT
(SELECT COUNT(*) FROM users) AS users_total,
(SELECT COUNT(*) FROM users WHERE auth_type='ldap') AS users_ldap,
(SELECT COUNT(*) FROM users WHERE contact_id IS NOT NULL) AS users_with_contact,
(SELECT COUNT(*) FROM contacts) AS contacts_total,
(SELECT COUNT(*) FROM contacts WHERE ldap_dn IS NOT NULL) AS contacts_with_ldap,
(SELECT COUNT(*) FROM patch_history) AS ph_total,
(SELECT COUNT(*) FROM patch_history WHERE intervenant_name IS NOT NULL) AS ph_with_name,
(SELECT COUNT(*) FROM patch_history WHERE intervenant_id IS NOT NULL) AS ph_with_user_fk
""")).fetchone()
print(f" users : total={r.users_total} | ldap={r.users_ldap} | lie contact={r.users_with_contact}")
print(f" contacts: total={r.contacts_total} | avec ldap_dn={r.contacts_with_ldap}")
print(f" patch_history : total={r.ph_total} | avec intervenant_name={r.ph_with_name} "
f"| avec intervenant_id (FK users)={r.ph_with_user_fk}")
print("\n=== DISTRIBUTION patch_history.intervenant_name ===")
for row in conn.execute(text("""
SELECT intervenant_name, COUNT(*) AS n
FROM patch_history WHERE intervenant_name IS NOT NULL
GROUP BY 1 ORDER BY 2 DESC
""")).fetchall():
print(f" {row.n:5d} {row.intervenant_name}")
print("\n=== USERS LDAP (candidats FK) ===")
for row in conn.execute(text("""
SELECT u.username, u.display_name, u.email, c.name AS contact_name,
CASE WHEN c.ldap_dn IS NOT NULL THEN 'LDAP' ELSE '-' END AS src
FROM users u LEFT JOIN contacts c ON u.contact_id=c.id
WHERE u.auth_type='ldap' ORDER BY u.username
""")).fetchall():
print(f" {row.username:15s} | {row.display_name or '-':45s} | {row.email:30s} | {row.src}")
def propose_mapping(conn):
"""Retourne dict {intervenant_name -> user_id} en matchant par prenom."""
users = conn.execute(text("""
SELECT id, username, display_name FROM users WHERE auth_type='ldap'
""")).fetchall()
names = conn.execute(text("""
SELECT DISTINCT intervenant_name FROM patch_history
WHERE intervenant_name IS NOT NULL
""")).fetchall()
mapping = {}
for name_row in names:
n = name_row.intervenant_name
if not n:
continue
n_lo = n.strip().lower()
# Matchs d'exclusion - collectives
if n_lo in ("secops", "secops-team", "secops team"):
continue
candidates = []
for u in users:
dn = (u.display_name or "").lower()
# Token match : "khalid" dans "moutaouakil-ext khalid (admin)"
if f" {n_lo} " in f" {dn} " or dn.endswith(f" {n_lo}") or dn.startswith(f"{n_lo} "):
candidates.append(u)
# Cas Joel : display_name peut contenir "Joël" avec accent
elif n_lo == "joel" and ("joël" in dn or "joel" in dn):
candidates.append(u)
if len(candidates) == 1:
mapping[n] = candidates[0].id
elif len(candidates) > 1:
print(f" [AMBIG] '{n}' matche {len(candidates)} users : {[c.username for c in candidates]}")
else:
print(f" [MISS] '{n}' -> aucun user LDAP trouve (peut-etre pas dans groupe secops)")
return mapping
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--apply", action="store_true",
help="Applique vraiment le UPDATE FK (par defaut : dry-run verif)")
args = parser.parse_args()
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
with engine.begin() as conn:
report_state(conn)
print("\n=== MATCHING intervenant_name -> users.id ===")
mapping = propose_mapping(conn)
print(f"\n {len(mapping)} correspondance(s) unique(s) trouvees :")
for name, uid in mapping.items():
u = conn.execute(text("SELECT display_name, username FROM users WHERE id=:i"),
{"i": uid}).fetchone()
print(f" '{name}' -> #{uid} {u.username} ({u.display_name})")
if not args.apply:
print("\n[DRY-RUN] Rien ecrit. Relance avec --apply pour UPDATE patch_history.intervenant_id")
return
print("\n=== APPLY : UPDATE patch_history.intervenant_id ===")
total_updated = 0
for name, uid in mapping.items():
r = conn.execute(text("""
UPDATE patch_history SET intervenant_id = :uid
WHERE intervenant_name = :name AND intervenant_id IS NULL
"""), {"uid": uid, "name": name})
print(f" '{name}' -> user #{uid} : {r.rowcount} lignes")
total_updated += r.rowcount
print(f"\n[OK] Total UPDATE : {total_updated} lignes")
# Re-verif apres
print("\n=== ETAT APRES APPLY ===")
report_state(conn)
if __name__ == "__main__":
main()