Compare commits

...

11 Commits

12 changed files with 1463 additions and 256 deletions

View File

@ -6,7 +6,7 @@ from starlette.middleware.base import BaseHTTPMiddleware
from .config import APP_NAME, APP_VERSION from .config import APP_NAME, APP_VERSION
from .dependencies import get_current_user, get_user_perms from .dependencies import get_current_user, get_user_perms
from .database import SessionLocal, SessionLocalDemo from .database import SessionLocal, SessionLocalDemo
from .routers import auth, dashboard, servers, settings, users, campaigns, planning, specifics, audit, contacts, qualys, qualys_tags, quickwin, referentiel, patching, applications from .routers import auth, dashboard, servers, settings, users, campaigns, planning, specifics, audit, contacts, qualys, qualys_tags, quickwin, referentiel, patching, applications, patch_history
class PermissionsMiddleware(BaseHTTPMiddleware): class PermissionsMiddleware(BaseHTTPMiddleware):
@ -64,6 +64,7 @@ app.include_router(qualys_tags.router)
app.include_router(quickwin.router) app.include_router(quickwin.router)
app.include_router(referentiel.router) app.include_router(referentiel.router)
app.include_router(patching.router) app.include_router(patching.router)
app.include_router(patch_history.router)
app.include_router(applications.router) app.include_router(applications.router)

View File

@ -1,4 +1,4 @@
"""Router Historique patching — vue de patch_history""" """Router Historique patching — vue unifiee patch_history + quickwin_entries"""
from fastapi import APIRouter, Request, Depends, Query from fastapi import APIRouter, Request, Depends, Query
from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates from fastapi.templating import Jinja2Templates
@ -12,27 +12,50 @@ templates = Jinja2Templates(directory="app/templates")
@router.get("/patching/historique", response_class=HTMLResponse) @router.get("/patching/historique", response_class=HTMLResponse)
async def patch_history_page(request: Request, db=Depends(get_db), async def patch_history_page(request: Request, db=Depends(get_db),
year: int = Query(None), week: int = Query(None), year: str = Query(""), week: str = Query(""),
hostname: str = Query(None), page: int = Query(1)): hostname: str = Query(""), source: str = Query(""),
os_family: str = Query(""), zone: str = Query(""),
domain: str = Query(""), intervenant: str = Query(""),
page: str = Query("1")):
user = get_current_user(request) user = get_current_user(request)
if not user: if not user:
return RedirectResponse(url="/login") return RedirectResponse(url="/login")
from datetime import datetime from datetime import datetime
if not year: year = int(year) if year and year.isdigit() else datetime.now().year
year = datetime.now().year week = int(week) if week and week.isdigit() else None
page = int(page) if page and page.isdigit() else 1
hostname = hostname.strip() or None
source = source.strip() or None
os_family = os_family.strip() or None
zone = zone.strip() or None
domain = domain.strip() or None
intervenant = intervenant.strip() or None
per_page = 100 per_page = 100
offset = (page - 1) * per_page offset = (page - 1) * per_page
# KPIs
kpis = {} kpis = {}
kpis["total"] = db.execute(text( kpis["total_ph"] = db.execute(text(
"SELECT COUNT(*) FROM patch_history WHERE EXTRACT(YEAR FROM date_patch)=:y" "SELECT COUNT(*) FROM patch_history WHERE EXTRACT(YEAR FROM date_patch)=:y"
), {"y": year}).scalar() ), {"y": year}).scalar()
kpis["servers"] = db.execute(text( kpis["total_qw"] = db.execute(text("""
"SELECT COUNT(DISTINCT server_id) FROM patch_history WHERE EXTRACT(YEAR FROM date_patch)=:y" SELECT COUNT(*) FROM quickwin_entries qe
), {"y": year}).scalar() JOIN quickwin_runs qr ON qe.run_id=qr.id
WHERE qe.status='patched' AND qr.year=:y
"""), {"y": year}).scalar()
kpis["total"] = kpis["total_ph"] + kpis["total_qw"]
kpis["servers"] = db.execute(text("""
SELECT COUNT(DISTINCT sid) FROM (
SELECT server_id AS sid FROM patch_history WHERE EXTRACT(YEAR FROM date_patch)=:y
UNION
SELECT qe.server_id FROM quickwin_entries qe
JOIN quickwin_runs qr ON qe.run_id=qr.id
WHERE qe.status='patched' AND qr.year=:y
) u
"""), {"y": year}).scalar()
kpis["patchables"] = db.execute(text( kpis["patchables"] = db.execute(text(
"SELECT COUNT(*) FROM servers WHERE etat='Production' AND patch_os_owner='secops'" "SELECT COUNT(*) FROM servers WHERE etat='Production' AND patch_os_owner='secops'"
)).scalar() )).scalar()
@ -41,56 +64,172 @@ async def patch_history_page(request: Request, db=Depends(get_db),
WHERE s.etat='Production' AND s.patch_os_owner='secops' WHERE s.etat='Production' AND s.patch_os_owner='secops'
AND NOT EXISTS (SELECT 1 FROM patch_history ph AND NOT EXISTS (SELECT 1 FROM patch_history ph
WHERE ph.server_id=s.id AND EXTRACT(YEAR FROM ph.date_patch)=:y) WHERE ph.server_id=s.id AND EXTRACT(YEAR FROM ph.date_patch)=:y)
AND NOT EXISTS (SELECT 1 FROM quickwin_entries qe
JOIN quickwin_runs qr ON qe.run_id=qr.id
WHERE qe.server_id=s.id AND qe.status='patched' AND qr.year=:y)
"""), {"y": year}).scalar() """), {"y": year}).scalar()
kpis["coverage_pct"] = round((kpis["servers"] / kpis["patchables"] * 100), 1) if kpis["patchables"] else 0 kpis["coverage_pct"] = round((kpis["servers"] / kpis["patchables"] * 100), 1) if kpis["patchables"] else 0
# Par semaine by_source = {}
by_source["import"] = db.execute(text(
"SELECT COUNT(*) FROM patch_history WHERE campaign_id IS NULL AND EXTRACT(YEAR FROM date_patch)=:y"
), {"y": year}).scalar()
by_source["standard"] = db.execute(text("""
SELECT COUNT(*) FROM patch_history ph
JOIN campaigns c ON ph.campaign_id=c.id
WHERE c.campaign_type='standard' AND EXTRACT(YEAR FROM ph.date_patch)=:y
"""), {"y": year}).scalar()
by_source["quickwin"] = kpis["total_qw"]
by_week = db.execute(text(""" by_week = db.execute(text("""
SELECT TO_CHAR(date_patch, 'IW') as week_num, SELECT week_num, SUM(cnt)::int as servers FROM (
COUNT(DISTINCT server_id) as servers SELECT TO_CHAR(date_patch, 'IW') as week_num, COUNT(DISTINCT server_id) as cnt
FROM patch_history FROM patch_history
WHERE EXTRACT(YEAR FROM date_patch)=:y WHERE EXTRACT(YEAR FROM date_patch)=:y
GROUP BY TO_CHAR(date_patch, 'IW') GROUP BY TO_CHAR(date_patch, 'IW')
ORDER BY week_num UNION ALL
SELECT LPAD(qr.week_number::text, 2, '0') as week_num, COUNT(DISTINCT qe.server_id) as cnt
FROM quickwin_entries qe
JOIN quickwin_runs qr ON qe.run_id=qr.id
WHERE qe.status='patched' AND qr.year=:y
GROUP BY qr.week_number
) u GROUP BY week_num ORDER BY week_num
"""), {"y": year}).fetchall() """), {"y": year}).fetchall()
# Filtres # Listes pour les filtres (selon annee courante)
where = ["EXTRACT(YEAR FROM ph.date_patch)=:y"] filter_opts = {}
filter_opts["os"] = [r.os for r in db.execute(text("""
SELECT DISTINCT s.os_family as os FROM servers s
WHERE s.os_family IS NOT NULL AND s.os_family <> ''
ORDER BY 1
""")).fetchall()]
filter_opts["zones"] = [r.zone for r in db.execute(text("""
SELECT DISTINCT z.name as zone FROM zones z ORDER BY 1
""")).fetchall()]
filter_opts["domains"] = [r.dom for r in db.execute(text("""
SELECT DISTINCT d.name as dom FROM domains d ORDER BY 1
""")).fetchall()]
filter_opts["intervenants"] = [r.interv for r in db.execute(text("""
SELECT DISTINCT intervenant_name as interv FROM patch_history
WHERE intervenant_name IS NOT NULL AND intervenant_name <> ''
ORDER BY 1
""")).fetchall()]
where_ph = ["EXTRACT(YEAR FROM ph.date_patch)=:y"]
where_qw = ["qr.year=:y", "qe.status='patched'"]
params = {"y": year, "limit": per_page, "offset": offset} params = {"y": year, "limit": per_page, "offset": offset}
if week: if week:
where.append("EXTRACT(WEEK FROM ph.date_patch)=:wk") where_ph.append("EXTRACT(WEEK FROM ph.date_patch)=:wk")
where_qw.append("qr.week_number=:wk")
params["wk"] = week params["wk"] = week
if hostname: if hostname:
where.append("s.hostname ILIKE :h") where_ph.append("s.hostname ILIKE :h")
where_qw.append("s.hostname ILIKE :h")
params["h"] = f"%{hostname}%" params["h"] = f"%{hostname}%"
wc = " AND ".join(where) if os_family:
where_ph.append("s.os_family=:os")
where_qw.append("s.os_family=:os")
params["os"] = os_family
if zone:
where_ph.append("z.name=:zn")
where_qw.append("z.name=:zn")
params["zn"] = zone
if domain:
where_ph.append("d.name=:dm")
where_qw.append("d.name=:dm")
params["dm"] = domain
if intervenant:
where_ph.append("ph.intervenant_name=:iv")
where_qw.append("1=0") # quickwin n'a pas ce champ
params["iv"] = intervenant
if source == "import":
where_ph.append("ph.campaign_id IS NULL")
elif source == "standard":
where_ph.append("c.campaign_type='standard'")
total_filtered = db.execute(text( wc_ph = " AND ".join(where_ph)
f"SELECT COUNT(*) FROM patch_history ph JOIN servers s ON ph.server_id=s.id WHERE {wc}" wc_qw = " AND ".join(where_qw)
), params).scalar()
rows = db.execute(text(f""" skip_qw = source in ("import", "standard") or bool(intervenant)
skip_ph = source == "quickwin"
ph_joins = """
JOIN servers s ON ph.server_id=s.id
LEFT JOIN zones z ON s.zone_id=z.id
LEFT JOIN domain_environments de ON s.domain_env_id=de.id
LEFT JOIN domains d ON de.domain_id=d.id
LEFT JOIN campaigns c ON ph.campaign_id=c.id
"""
qw_joins = """
JOIN quickwin_runs qr ON qe.run_id=qr.id
JOIN servers s ON qe.server_id=s.id
LEFT JOIN zones z ON s.zone_id=z.id
LEFT JOIN domain_environments de ON s.domain_env_id=de.id
LEFT JOIN domains d ON de.domain_id=d.id
"""
count_parts = []
if not skip_ph:
count_parts.append(f"SELECT COUNT(*) FROM patch_history ph {ph_joins} WHERE {wc_ph}")
if not skip_qw:
count_parts.append(f"SELECT COUNT(*) FROM quickwin_entries qe {qw_joins} WHERE {wc_qw}")
count_sql = " + ".join(f"({p})" for p in count_parts) if count_parts else "0"
total_filtered = db.execute(text(f"SELECT {count_sql}"), params).scalar()
union_parts = []
if not skip_ph:
union_parts.append(f"""
SELECT s.id as sid, s.hostname, s.os_family, s.etat, SELECT s.id as sid, s.hostname, s.os_family, s.etat,
ph.date_patch, ph.status, ph.notes, ph.date_patch, ph.status, ph.notes, ph.intervenant_name,
z.name as zone z.name as zone, d.name as domain_name,
FROM patch_history ph CASE WHEN ph.campaign_id IS NULL THEN 'import'
JOIN servers s ON ph.server_id = s.id ELSE COALESCE(c.campaign_type, 'standard') END as source_type,
LEFT JOIN zones z ON s.zone_id = z.id c.id as campaign_id, c.label as campaign_label,
WHERE {wc} NULL::int as run_id, NULL::text as run_label
ORDER BY ph.date_patch DESC FROM patch_history ph {ph_joins}
WHERE {wc_ph}
""")
if not skip_qw:
union_parts.append(f"""
SELECT s.id as sid, s.hostname, s.os_family, s.etat,
qe.patch_date as date_patch, qe.status, qe.notes,
NULL::text as intervenant_name,
z.name as zone, d.name as domain_name,
'quickwin' as source_type,
NULL::int as campaign_id, NULL::text as campaign_label,
qr.id as run_id, qr.label as run_label
FROM quickwin_entries qe {qw_joins}
WHERE {wc_qw}
""")
if not union_parts:
union_parts.append("""SELECT NULL::int as sid, NULL as hostname, NULL as os_family, NULL as etat,
NULL::timestamptz as date_patch, NULL as status, NULL as notes, NULL as intervenant_name,
NULL as zone, NULL as domain_name, NULL as source_type,
NULL::int as campaign_id, NULL as campaign_label, NULL::int as run_id, NULL as run_label
WHERE 1=0""")
union_sql = " UNION ALL ".join(union_parts)
rows = db.execute(text(f"""
SELECT * FROM ({union_sql}) combined
ORDER BY date_patch DESC NULLS LAST
LIMIT :limit OFFSET :offset LIMIT :limit OFFSET :offset
"""), params).fetchall() """), params).fetchall()
# Années dispo
years = db.execute(text(""" years = db.execute(text("""
SELECT DISTINCT EXTRACT(YEAR FROM date_patch)::int as y SELECT DISTINCT y FROM (
FROM patch_history ORDER BY y DESC SELECT EXTRACT(YEAR FROM date_patch)::int as y FROM patch_history
UNION
SELECT year as y FROM quickwin_runs
) u ORDER BY y DESC
""")).fetchall() """)).fetchall()
return templates.TemplateResponse("patch_history.html", { return templates.TemplateResponse("patch_history.html", {
"request": request, "user": user, "app_name": APP_NAME, "request": request, "user": user, "app_name": APP_NAME,
"kpis": kpis, "by_week": by_week, "rows": rows, "kpis": kpis, "by_week": by_week, "by_source": by_source,
"year": year, "week": week, "hostname": hostname, "rows": rows, "year": year, "week": week, "hostname": hostname,
"page": page, "per_page": per_page, "total_filtered": total_filtered, "source": source, "os_family": os_family, "zone": zone,
"years": [y.y for y in years], "domain": domain, "intervenant": intervenant,
"filter_opts": filter_opts, "page": page, "per_page": per_page,
"total_filtered": total_filtered, "years": [y.y for y in years],
}) })

View File

@ -89,6 +89,7 @@
{% if p.campaigns %}<a href="/campaigns" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if 'campaigns' in path and 'assignments' not in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Campagnes</a>{% endif %} {% if p.campaigns %}<a href="/campaigns" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if 'campaigns' in path and 'assignments' not in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Campagnes</a>{% endif %}
{% if p.servers in ('edit','admin') or p.campaigns in ('edit','admin') or p.quickwin in ('edit','admin') %}<a href="/patching/config-exclusions" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if 'config-exclusions' in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Config exclusions</a>{% endif %} {% if p.servers in ('edit','admin') or p.campaigns in ('edit','admin') or p.quickwin in ('edit','admin') %}<a href="/patching/config-exclusions" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if 'config-exclusions' in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Config exclusions</a>{% endif %}
{% if p.campaigns in ('edit','admin') or p.quickwin in ('edit','admin') %}<a href="/patching/validations" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if '/patching/validations' in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Validations</a>{% endif %} {% if p.campaigns in ('edit','admin') or p.quickwin in ('edit','admin') %}<a href="/patching/validations" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if '/patching/validations' in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Validations</a>{% endif %}
<a href="/patching/historique" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if '/patching/historique' in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Historique</a>
{# Quickwin sous-groupe #} {# Quickwin sous-groupe #}
{% if p.campaigns or p.quickwin %} {% if p.campaigns or p.quickwin %}

View File

@ -0,0 +1,160 @@
{% extends 'base.html' %}
{% block title %}Historique patching{% endblock %}
{% block content %}
<div class="flex justify-between items-center mb-4">
<div>
<h2 class="text-xl font-bold text-cyber-accent">Historique patching</h2>
<p class="text-xs text-gray-500 mt-1">Vue unifiée : imports xlsx + campagnes standard + QuickWin.</p>
</div>
<div class="flex gap-2">
{% for y in years %}<a href="?year={{ y }}" class="btn-sm {% if y == year %}bg-cyber-accent text-black{% else %}bg-cyber-border text-gray-300{% endif %} px-3 py-1 text-xs">{{ y }}</a>{% endfor %}
</div>
</div>
<!-- KPIs -->
<div style="display:flex;flex-wrap:wrap;gap:8px;margin-bottom:16px;">
<div class="card p-3 text-center" style="flex:1;min-width:0">
<div class="text-2xl font-bold text-cyber-accent">{{ kpis.total }}</div>
<div class="text-xs text-gray-500">Events {{ year }}</div>
</div>
<div class="card p-3 text-center" style="flex:1;min-width:0">
<div class="text-2xl font-bold text-cyber-green">{{ kpis.servers }}</div>
<div class="text-xs text-gray-500">Serveurs distincts</div>
</div>
<div class="card p-3 text-center" style="flex:1;min-width:0">
<div class="text-2xl font-bold text-white">{{ kpis.patchables }}</div>
<div class="text-xs text-gray-500">Patchables SecOps</div>
</div>
<div class="card p-3 text-center" style="flex:1;min-width:0">
<div class="text-2xl font-bold {% if kpis.never > 0 %}text-cyber-red{% else %}text-cyber-green{% endif %}">{{ kpis.never }}</div>
<div class="text-xs text-gray-500">Jamais patchés {{ year }}</div>
</div>
<div class="card p-3 text-center" style="flex:1;min-width:0">
<div class="text-2xl font-bold {% if kpis.coverage_pct >= 80 %}text-cyber-green{% elif kpis.coverage_pct >= 50 %}text-cyber-yellow{% else %}text-cyber-red{% endif %}">{{ kpis.coverage_pct }}%</div>
<div class="text-xs text-gray-500">Couverture</div>
</div>
</div>
<!-- Répartition par source -->
<div style="display:flex;flex-wrap:wrap;gap:8px;margin-bottom:16px;">
<a href="?year={{ year }}&source=import" class="card p-3 text-center hover:border-cyber-accent" style="flex:1;min-width:0">
<div class="text-2xl font-bold text-blue-400">{{ by_source.import }}</div>
<div class="text-xs text-gray-500">Import xlsx</div>
</a>
<a href="?year={{ year }}&source=standard" class="card p-3 text-center hover:border-cyber-accent" style="flex:1;min-width:0">
<div class="text-2xl font-bold text-cyan-400">{{ by_source.standard }}</div>
<div class="text-xs text-gray-500">Campagnes standard</div>
</a>
<a href="?year={{ year }}&source=quickwin" class="card p-3 text-center hover:border-cyber-accent" style="flex:1;min-width:0">
<div class="text-2xl font-bold text-purple-400">{{ by_source.quickwin }}</div>
<div class="text-xs text-gray-500">QuickWin</div>
</a>
</div>
<!-- Graphique par semaine -->
{% if by_week %}
<div class="card p-4 mb-4">
<h3 class="text-sm font-bold text-cyber-accent mb-3">Serveurs patchés par semaine ({{ year }})</h3>
<div style="display:flex;align-items:flex-end;gap:2px;height:120px;">
{% set max_val = by_week|map(attribute='servers')|max %}
{% for w in by_week %}
<a href="?year={{ year }}&week={{ w.week_num|int }}" title="S{{ w.week_num }} : {{ w.servers }} serveur(s)" style="flex:1;display:flex;flex-direction:column;align-items:center;min-width:0;">
<div style="width:100%;background:{% if week and week == w.week_num|int %}#00ff88{% elif w.servers >= 30 %}#06b6d4{% elif w.servers >= 15 %}#0e7490{% else %}#164e63{% endif %};border-radius:2px 2px 0 0;height:{{ (w.servers / max_val * 100)|int if max_val else 0 }}px;min-height:2px;"></div>
<span style="font-size:8px;color:#6b7280;margin-top:2px;">{{ w.week_num }}</span>
</a>
{% endfor %}
</div>
</div>
{% endif %}
<!-- Filtres -->
<div class="card p-3 mb-4">
<form method="GET" class="flex gap-2 items-center flex-wrap">
<input type="hidden" name="year" value="{{ year }}">
<select name="week" class="text-xs py-1 px-2">
<option value="">Toutes semaines</option>
{% for w in by_week %}<option value="{{ w.week_num|int }}" {% if week == w.week_num|int %}selected{% endif %}>S{{ w.week_num }} ({{ w.servers }})</option>{% endfor %}
</select>
<select name="source" class="text-xs py-1 px-2">
<option value="">Toutes sources</option>
<option value="import" {% if source == 'import' %}selected{% endif %}>Import xlsx</option>
<option value="standard" {% if source == 'standard' %}selected{% endif %}>Campagne std</option>
<option value="quickwin" {% if source == 'quickwin' %}selected{% endif %}>QuickWin</option>
</select>
<select name="os_family" class="text-xs py-1 px-2">
<option value="">Tous OS</option>
{% for o in filter_opts.os %}<option value="{{ o }}" {% if os_family == o %}selected{% endif %}>{{ o }}</option>{% endfor %}
</select>
<select name="zone" class="text-xs py-1 px-2">
<option value="">Toutes zones</option>
{% for z in filter_opts.zones %}<option value="{{ z }}" {% if zone == z %}selected{% endif %}>{{ z }}</option>{% endfor %}
</select>
<select name="domain" class="text-xs py-1 px-2">
<option value="">Tous domaines</option>
{% for d in filter_opts.domains %}<option value="{{ d }}" {% if domain == d %}selected{% endif %}>{{ d }}</option>{% endfor %}
</select>
<select name="intervenant" class="text-xs py-1 px-2">
<option value="">Tous intervenants</option>
{% for i in filter_opts.intervenants %}<option value="{{ i }}" {% if intervenant == i %}selected{% endif %}>{{ i }}</option>{% endfor %}
</select>
<input type="text" name="hostname" value="{{ hostname or '' }}" placeholder="Hostname..." class="text-xs py-1 px-2" style="width:140px">
<button type="submit" class="btn-primary px-3 py-1 text-xs">Filtrer</button>
<a href="/patching/historique?year={{ year }}" class="text-xs text-gray-500 hover:text-cyber-accent">Reset</a>
<span class="text-xs text-gray-500 ml-auto">{{ total_filtered }} résultat{{ 's' if total_filtered != 1 }}</span>
</form>
</div>
<!-- Tableau -->
<div class="card overflow-x-auto">
<table class="w-full table-cyber text-xs">
<thead><tr>
<th class="p-2 text-left">Hostname</th>
<th class="p-2 text-center">OS</th>
<th class="p-2 text-center">Zone</th>
<th class="p-2 text-center">Domaine</th>
<th class="p-2 text-center">État</th>
<th class="p-2 text-center">Date</th>
<th class="p-2 text-center">Sem.</th>
<th class="p-2 text-center">Intervenant</th>
<th class="p-2 text-center">Source</th>
<th class="p-2 text-center">Status</th>
<th class="p-2 text-left">Notes</th>
</tr></thead>
<tbody>
{% for r in rows %}
<tr class="border-t border-cyber-border/30 hover:bg-cyber-hover/20">
<td class="p-2 font-mono text-cyber-accent"><a href="/servers/{{ r.sid }}" class="hover:underline">{{ r.hostname }}</a></td>
<td class="p-2 text-center text-gray-400">{{ (r.os_family or '-')[:6] }}</td>
<td class="p-2 text-center"><span class="badge {% if r.zone == 'DMZ' %}badge-red{% else %}badge-gray{% endif %}">{{ r.zone or '-' }}</span></td>
<td class="p-2 text-center text-gray-300">{{ (r.domain_name or '-')[:10] }}</td>
<td class="p-2 text-center"><span class="badge {% if r.etat == 'Production' %}badge-green{% else %}badge-yellow{% endif %}">{{ (r.etat or '-')[:6] }}</span></td>
<td class="p-2 text-center text-gray-300">{{ r.date_patch.strftime('%Y-%m-%d %H:%M') if r.date_patch else '-' }}</td>
<td class="p-2 text-center text-gray-400">{% if r.date_patch %}S{{ r.date_patch.strftime('%V') }}{% else %}-{% endif %}</td>
<td class="p-2 text-center text-gray-300">{{ r.intervenant_name or '-' }}</td>
<td class="p-2 text-center">
{% if r.source_type == 'import' %}<span class="badge" style="background:#1e3a5f;color:#60a5fa;">xlsx</span>
{% elif r.source_type == 'standard' %}<a href="/campaigns/{{ r.campaign_id }}" class="badge" style="background:#164e63;color:#22d3ee;text-decoration:none">{{ r.campaign_label or 'Campagne' }}</a>
{% elif r.source_type == 'quickwin' %}<a href="/quickwin/{{ r.run_id }}" class="badge" style="background:#3b1f5e;color:#c084fc;text-decoration:none">{{ r.run_label or 'QuickWin' }}</a>
{% else %}<span class="badge badge-gray">{{ r.source_type or '?' }}</span>{% endif %}
</td>
<td class="p-2 text-center"><span class="badge {% if r.status == 'ok' or r.status == 'patched' %}badge-green{% elif r.status == 'ko' or r.status == 'failed' %}badge-red{% else %}badge-yellow{% endif %}">{{ r.status }}</span></td>
<td class="p-2 text-gray-400" style="max-width:180px;overflow:hidden;text-overflow:ellipsis;white-space:nowrap" title="{{ r.notes or '' }}">{{ (r.notes or '-')[:40] }}</td>
</tr>
{% endfor %}
{% if not rows %}
<tr><td colspan="11" class="p-6 text-center text-gray-500">Aucun event de patching pour ce filtre</td></tr>
{% endif %}
</tbody>
</table>
</div>
<!-- Pagination -->
{% if total_filtered > per_page %}
<div class="flex justify-center gap-2 mt-4">
{% set qs = 'year=' ~ year ~ ('&week=' ~ week if week else '') ~ ('&source=' ~ source if source else '') ~ ('&os_family=' ~ os_family if os_family else '') ~ ('&zone=' ~ zone if zone else '') ~ ('&domain=' ~ domain if domain else '') ~ ('&intervenant=' ~ intervenant if intervenant else '') ~ ('&hostname=' ~ hostname if hostname else '') %}
{% if page > 1 %}<a href="?{{ qs }}&page={{ page - 1 }}" class="btn-sm bg-cyber-border text-gray-300 px-3 py-1 text-xs">← Précédent</a>{% endif %}
<span class="text-xs text-gray-500 py-1">Page {{ page }} / {{ ((total_filtered - 1) // per_page) + 1 }}</span>
{% if page * per_page < total_filtered %}<a href="?{{ qs }}&page={{ page + 1 }}" class="btn-sm bg-cyber-border text-gray-300 px-3 py-1 text-xs">Suivant →</a>{% endif %}
</div>
{% endif %}
{% endblock %}

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,10 @@
-- Migration 2026-04-17 : ajout colonne intervenant_name a patch_history
-- pour stocker le nom d'intervenant libre provenant du xlsx (ex "Khalid", "Thierno")
-- sans FK users (car ne correspond pas forcement a un user patchcenter)
BEGIN;
ALTER TABLE patch_history ADD COLUMN IF NOT EXISTS intervenant_name varchar(100);
CREATE INDEX IF NOT EXISTS idx_ph_intervenant_name ON patch_history (intervenant_name);
COMMIT;

View File

@ -0,0 +1,44 @@
-- Migration 2026-04-17 : lier users ↔ contacts ↔ LDAP proprement (FK + index)
--
-- Avant : users.itop_person_id (int) pointe vers iTop (pas vers contacts.id)
-- -> lien indirect fragile entre users et contacts via itop_id
--
-- Apres : users.contact_id (FK propre vers contacts.id)
-- contacts.ldap_dn (trace la source AD quand le contact vient d'un import LDAP)
-- Les 3 tables sont jointes directement : users.contact_id = contacts.id
-- La source LDAP est identifiee par contacts.ldap_dn IS NOT NULL et/ou
-- users.auth_type = 'ldap'.
BEGIN;
-- 1. users.contact_id : FK vers contacts.id
ALTER TABLE users ADD COLUMN IF NOT EXISTS contact_id INTEGER;
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint WHERE conname = 'users_contact_id_fkey'
) THEN
ALTER TABLE users ADD CONSTRAINT users_contact_id_fkey
FOREIGN KEY (contact_id) REFERENCES contacts(id) ON DELETE SET NULL;
END IF;
END$$;
CREATE INDEX IF NOT EXISTS idx_users_contact_id ON users (contact_id);
-- 2. contacts.ldap_dn : trace provenance AD
ALTER TABLE contacts ADD COLUMN IF NOT EXISTS ldap_dn varchar(500);
CREATE INDEX IF NOT EXISTS idx_contacts_ldap_dn ON contacts (ldap_dn)
WHERE ldap_dn IS NOT NULL;
-- 3. Backfill users.contact_id depuis users.email <-> contacts.email
-- (pour les users deja presents dont l'email matche un contact)
UPDATE users u
SET contact_id = c.id
FROM contacts c
WHERE u.contact_id IS NULL
AND u.email IS NOT NULL
AND lower(u.email) = lower(c.email);
COMMENT ON COLUMN users.contact_id IS 'FK vers contacts.id — lien direct user ↔ contact (le meme email)';
COMMENT ON COLUMN contacts.ldap_dn IS 'DN AD d''ou provient ce contact (import LDAP). NULL si import iTop ou saisie manuelle';
COMMIT;

View File

@ -0,0 +1,223 @@
"""Import des membres d'un groupe AD vers la table users + lien avec contacts.
3 champs lies :
1. LDAP/AD (source : groupe AD specifique, ex. CN=secops,...)
2. contacts (par email : match existant, creation si absent)
3. users (par username=sAMAccountName, auth_type='ldap')
+ users.itop_person_id = contacts.itop_id (si contact matche)
Par defaut le groupe AD cible est :
CN=secops,OU=Groupes d administration,OU=Administration,DC=sanef,DC=groupe
La config LDAP (serveur, bind DN, bind pass, base DN) est lue depuis app_secrets
via ldap_service.
Usage (doit tourner depuis le poste SANEF car l'AD n'est pas joignable du lab) :
python tools/import_ldap_group_users.py
python tools/import_ldap_group_users.py --group "CN=secops,OU=...,DC=sanef,DC=groupe" --dry-run
"""
import os
import sys
import argparse
from pathlib import Path
from sqlalchemy import create_engine, text
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
or os.getenv("DATABASE_URL")
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
DEFAULT_GROUP_DN = "CN=secops,OU=Groupes d administration,OU=Administration,DC=sanef,DC=groupe"
def get_ldap_config(engine):
"""Recupere la config LDAP depuis app_secrets (reutilise ldap_service)."""
from app.services.ldap_service import _get_config
with engine.connect() as conn:
return _get_config(conn)
def fetch_group_members(cfg, group_dn):
"""Retourne liste de dicts {username, name, email, dn}.
Strategie : bind service account -> search pour user dont memberOf contient group_dn.
Plus fiable que de lire group.member (limite 1500 DN par defaut).
"""
from ldap3 import Server, Connection, ALL, SUBTREE
use_ssl = cfg["server"].startswith("ldaps://")
server = Server(cfg["server"], get_info=ALL, use_ssl=use_ssl)
conn = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"],
auto_bind=True)
# Filter LDAP : membre direct du groupe (inclut comptes admin, meme sans mail)
search_filter = (
f"(&(objectClass=user)(objectCategory=person)"
f"(memberOf={group_dn}))"
)
conn.search(cfg["base_dn"], search_filter, search_scope=SUBTREE,
attributes=["sAMAccountName", "displayName", "mail",
"userPrincipalName", "distinguishedName",
"userAccountControl"])
members = []
for entry in conn.entries:
sam = str(entry.sAMAccountName) if entry.sAMAccountName else None
if not sam:
print(f" [SKIP] Entry sans sAMAccountName : {entry.entry_dn}")
continue
# Priorite email : mail > userPrincipalName > fallback sam@sanef.com
email = None
if entry.mail and str(entry.mail).strip():
email = str(entry.mail).strip().lower()
elif entry.userPrincipalName and str(entry.userPrincipalName).strip():
email = str(entry.userPrincipalName).strip().lower()
else:
email = f"{sam.lower()}@sanef.com"
print(f" [INFO] {sam} sans mail AD, fallback : {email}")
# Verifier si compte desactive (pour info seulement)
uac = entry.userAccountControl.value if entry.userAccountControl else 0
if isinstance(uac, int) and uac & 0x2:
print(f" [WARN] {sam} compte AD DESACTIVE (UAC={uac}) — importe quand meme")
members.append({
"username": sam.lower(),
"display_name": str(entry.displayName) if entry.displayName else sam,
"email": email,
"dn": str(entry.entry_dn),
})
conn.unbind()
return members
SQL_FIND_CONTACT = text("""
SELECT id, itop_id FROM contacts WHERE lower(email) = :email LIMIT 1
""")
SQL_INSERT_CONTACT = text("""
INSERT INTO contacts (name, email, role, team, ldap_dn,
is_active, is_verified, created_at, updated_at)
VALUES (:name, :email, 'contact_technique', 'SecOps', :ldap_dn,
true, true, now(), now())
ON CONFLICT (email) DO UPDATE SET
name = EXCLUDED.name,
ldap_dn = EXCLUDED.ldap_dn,
updated_at = now()
RETURNING id, itop_id
""")
SQL_FIND_USER = text("""
SELECT id FROM users WHERE username = :username LIMIT 1
""")
SQL_INSERT_USER = text("""
INSERT INTO users (username, display_name, email, role, auth_type,
is_active, contact_id, itop_person_id,
created_at, updated_at)
VALUES (:username, :display_name, :email, 'operator', 'ldap',
true, :contact_id, :itop_pid,
now(), now())
""")
SQL_UPDATE_USER = text("""
UPDATE users SET
display_name = :display_name,
email = :email,
auth_type = 'ldap',
-- is_active PRESERVE : jamais reactive un user desactive manuellement
contact_id = COALESCE(:contact_id, contact_id),
itop_person_id = COALESCE(:itop_pid, itop_person_id),
updated_at = now()
WHERE id = :uid
""")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--group", default=DEFAULT_GROUP_DN,
help=f"DN du groupe AD (defaut: {DEFAULT_GROUP_DN})")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
print(f"[INFO] Groupe cible : {args.group}")
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
cfg = get_ldap_config(engine)
if not cfg["enabled"]:
print("[ERR] LDAP desactive dans app_secrets (ldap_enabled != true).")
sys.exit(1)
if not cfg["server"] or not cfg["base_dn"]:
print("[ERR] LDAP non configure (server ou base_dn manquant).")
sys.exit(1)
print(f"[INFO] LDAP server : {cfg['server']} base_dn : {cfg['base_dn']}")
try:
members = fetch_group_members(cfg, args.group)
except Exception as e:
print(f"[ERR] LDAP search failed : {e}")
sys.exit(2)
print(f"[INFO] Membres AD retrouves : {len(members)}")
for m in members[:5]:
print(f" {m['username']:20s} {m['email']:40s} {m['display_name']}")
if len(members) > 5:
print(f" ... ({len(members) - 5} autres)")
if args.dry_run:
print("[DRY-RUN] Aucun write")
return
inserted_u = updated_u = created_c = linked_itop = linked_contact = 0
with engine.begin() as conn:
for m in members:
# 1. Contact : find or create (stocke ldap_dn pour tracer source AD)
row = conn.execute(SQL_FIND_CONTACT, {"email": m["email"]}).fetchone()
if row:
contact_id, itop_id = row
# Mettre a jour ldap_dn si contact deja existant sans la trace
conn.execute(text(
"UPDATE contacts SET ldap_dn = COALESCE(ldap_dn, :dn) WHERE id = :cid"
), {"dn": m["dn"], "cid": contact_id})
else:
r = conn.execute(SQL_INSERT_CONTACT, {
"name": m["display_name"],
"email": m["email"],
"ldap_dn": m["dn"],
}).fetchone()
contact_id, itop_id = r
created_c += 1
# 2. User : upsert + lien contact_id (FK) + itop_person_id (historique)
u = conn.execute(SQL_FIND_USER, {"username": m["username"]}).fetchone()
params = {
"username": m["username"],
"display_name": m["display_name"],
"email": m["email"],
"contact_id": contact_id,
"itop_pid": itop_id, # None si contact sans itop_id
}
if u:
conn.execute(SQL_UPDATE_USER, {**params, "uid": u[0]})
updated_u += 1
else:
conn.execute(SQL_INSERT_USER, params)
inserted_u += 1
linked_contact += 1
if itop_id:
linked_itop += 1
print(f"[OK] Termine :")
print(f" users : INSERT {inserted_u} UPDATE {updated_u}")
print(f" contacts : CREATE {created_c}")
print(f" links : users.contact_id = {linked_contact}")
print(f" users.itop_person_id = {linked_itop}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,275 @@
"""Import historique patching depuis Plan de Patching serveurs 2026.xlsx (SOURCE DE VERITE).
Perimetre : 2025 + 2026 uniquement.
- Histo-2025 (cols L/M = 1er sem, O/P = 2eme sem)
- S02..S52 (weekly 2026 : nom de cellule VERT = patche)
Regles :
- Weekly sheets : cellule du nom (col A) AVEC FOND VERT = serveur patche
- Date : col N (14) ; Heure : col O (15)
- Si date manque -> lundi de la semaine (ISO) ; si heure manque -> 00:00
- La semaine est toujours derivee du nom de sheet (S02..S52) ou de date_patch
Usage :
python tools/import_plan_patching_xlsx.py [xlsx] [--truncate] [--dry-run]
"""
import os
import re
import sys
import glob
import argparse
from datetime import datetime, time, date, timedelta
from pathlib import Path
import openpyxl
from sqlalchemy import create_engine, text
ROOT = Path(__file__).resolve().parent.parent
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
or os.getenv("DATABASE_URL")
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
# Normalisation des noms d'intervenants (xlsx libre -> canonique)
INTERVENANT_MAP = {
"sophie/joel": "Joel",
"joel/sophie": "Joel",
}
def normalize_intervenant(name):
if not name:
return None
s = str(name).strip()
return INTERVENANT_MAP.get(s.lower(), s)
def is_green(cell):
"""True si la cellule a un fond vert (dominante G > R et G > B)."""
if cell.fill is None or cell.fill.fgColor is None:
return False
fc = cell.fill.fgColor
rgb = None
if fc.type == "rgb" and fc.rgb:
rgb = fc.rgb.upper()
elif fc.type == "theme":
# Themes Office 9/6 = green-ish accents
return fc.theme in (9, 6)
if not rgb or len(rgb) < 6:
return False
try:
rr = int(rgb[-6:-4], 16)
gg = int(rgb[-4:-2], 16)
bb = int(rgb[-2:], 16)
except ValueError:
return False
return gg > 120 and gg > rr + 30 and gg > bb + 30
def parse_week_num(sheet_name):
m = re.match(r"^[Ss](\d{1,2})$", sheet_name.strip())
return int(m.group(1)) if m else None
def monday_of_iso_week(year, week):
jan4 = date(year, 1, 4)
start = jan4 - timedelta(days=jan4.isoweekday() - 1) + timedelta(weeks=week - 1)
return start
def parse_hour(val):
if val is None:
return None
if isinstance(val, time):
return val
if isinstance(val, datetime):
return val.time()
s = str(val).strip().lower().replace("h", ":")
m = re.match(r"(\d{1,2})(?::(\d{2}))?", s)
if not m:
return None
hh = int(m.group(1))
mm = int(m.group(2) or 0)
if 0 <= hh < 24 and 0 <= mm < 60:
return time(hh, mm)
return None
def parse_date_cell(val):
if val is None:
return None
if isinstance(val, datetime):
return val
if isinstance(val, date):
return datetime.combine(val, time(0, 0))
s = str(val).strip()
m = re.match(r"(\d{2})/(\d{2})/(\d{4})", s)
if m:
try:
return datetime(int(m.group(3)), int(m.group(2)), int(m.group(1)))
except Exception:
return None
return None
def find_xlsx():
for p in [
ROOT / "deploy" / "Plan de Patching serveurs 2026.xlsx",
ROOT / "deploy" / "Plan_de_Patching_serveurs_2026.xlsx",
]:
if p.exists():
return str(p)
hits = glob.glob(str(ROOT / "deploy" / "Plan*Patching*erveurs*2026*.xlsx"))
return hits[0] if hits else None
def collect_events(wb, hosts):
"""Retourne liste dicts patch_history : {sid, dt, status, notes}.
3 champs toujours renseignes : semaine (dans notes), date (date_patch::date),
heure (date_patch::time 00:00 si inconnue).
"""
events = []
stats = {"histo_2025_s1": 0, "histo_2025_s2": 0,
"weekly": 0, "no_server": 0, "weekly_no_color": 0}
# --- Histo-2025 : col B (2) Intervenant, col L (12) date S1, col M (13) flag S1, col O (15) date S2, col P (16) flag S2
if "Histo-2025" in wb.sheetnames:
ws = wb["Histo-2025"]
for row_idx in range(2, ws.max_row + 1):
hn = ws.cell(row=row_idx, column=1).value
if not hn:
continue
sid = hosts.get(str(hn).strip().lower())
if not sid:
stats["no_server"] += 1
continue
interv = ws.cell(row=row_idx, column=2).value
interv = str(interv).strip() if interv else None
date_s1 = parse_date_cell(ws.cell(row=row_idx, column=12).value)
flag_s1 = ws.cell(row=row_idx, column=13).value
if flag_s1 and isinstance(flag_s1, int) and flag_s1 >= 1:
dt = date_s1 or datetime(2025, 6, 30, 0, 0)
events.append({"sid": sid, "dt": dt, "status": "ok",
"notes": f"Histo-2025 S1 (x{flag_s1})",
"interv": interv})
stats["histo_2025_s1"] += 1
date_s2 = parse_date_cell(ws.cell(row=row_idx, column=15).value)
flag_s2 = ws.cell(row=row_idx, column=16).value
if flag_s2 and isinstance(flag_s2, int) and flag_s2 >= 1:
dt = date_s2 or datetime(2025, 12, 31, 0, 0)
events.append({"sid": sid, "dt": dt, "status": "ok",
"notes": f"Histo-2025 S2 (x{flag_s2})",
"interv": interv})
stats["histo_2025_s2"] += 1
# --- Weekly sheets S02..S52 : nom colore VERT = patche (2026)
for sname in wb.sheetnames:
wk = parse_week_num(sname)
if wk is None or not (1 <= wk <= 53):
continue
ws = wb[sname]
fallback_monday = monday_of_iso_week(2026, wk)
for row_idx in range(2, ws.max_row + 1):
hn_cell = ws.cell(row=row_idx, column=1)
hn = hn_cell.value
if not hn or not any(c.isalpha() for c in str(hn)):
continue
if not is_green(hn_cell):
stats["weekly_no_color"] += 1
continue
hn_norm = str(hn).strip().split(".")[0].lower()
sid = hosts.get(hn_norm)
if not sid:
stats["no_server"] += 1
continue
interv = ws.cell(row=row_idx, column=2).value
interv = str(interv).strip() if interv else None
# col N (14) = Date, col O (15) = Heure
date_val = ws.cell(row=row_idx, column=14).value
hour_val = ws.cell(row=row_idx, column=15).value
dt_base = parse_date_cell(date_val) or datetime.combine(fallback_monday, time(0, 0))
hr = parse_hour(hour_val)
if hr:
dt_base = datetime.combine(dt_base.date(), hr)
# sinon : heure = 00:00 par defaut (deja dans dt_base)
# Skip si date de patch dans le futur (cellule coloree en avance)
if dt_base > datetime.now():
stats["weekly_future"] = stats.get("weekly_future", 0) + 1
continue
events.append({"sid": sid, "dt": dt_base, "status": "ok",
"notes": f"Semaine {wk:02d} 2026",
"interv": interv})
stats["weekly"] += 1
return events, stats
def main():
parser = argparse.ArgumentParser()
parser.add_argument("xlsx", nargs="?", default=None)
parser.add_argument("--truncate", action="store_true",
help="TRUNCATE patch_history avant import (source de verite)")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
xlsx = args.xlsx or find_xlsx()
if not xlsx or not os.path.exists(xlsx):
print("[ERR] Fichier Plan de Patching introuvable. Place-le dans deploy/.")
sys.exit(1)
print(f"[INFO] Fichier: {xlsx}")
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
wb = openpyxl.load_workbook(xlsx, data_only=True)
print(f"[INFO] Sheets: {', '.join(wb.sheetnames)}")
with engine.begin() as conn:
hosts = {}
for r in conn.execute(text("SELECT id, hostname FROM servers")).fetchall():
hosts[r.hostname.lower()] = r.id
print(f"[INFO] Servers en DB: {len(hosts)}")
events, stats = collect_events(wb, hosts)
print("[INFO] Events detectes:")
for k, v in stats.items():
print(f" {v:5d} {k}")
print(f"[INFO] TOTAL events: {len(events)}")
if args.dry_run:
print("[DRY-RUN] Aucun write")
return
if args.truncate:
print("[INFO] TRUNCATE patch_history RESTART IDENTITY CASCADE")
conn.execute(text("TRUNCATE TABLE patch_history RESTART IDENTITY CASCADE"))
inserted = skipped = 0
for ev in events:
existing = conn.execute(text(
"SELECT id FROM patch_history WHERE server_id=:sid AND date_patch=:dt"
), {"sid": ev["sid"], "dt": ev["dt"]}).fetchone()
if existing:
skipped += 1
continue
conn.execute(text("""
INSERT INTO patch_history (server_id, date_patch, status, notes, intervenant_name)
VALUES (:sid, :dt, :status, :notes, :interv)
"""), ev)
inserted += 1
print(f"[OK] INSERT: {inserted} | SKIP (doublon): {skipped}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,213 @@
"""Import planning annuel patching depuis Planning Patching 2026_ayoub.xlsx feuille Planning.
Mapping colonnes feuille Planning :
A : domaine+env (ex Infrastructure HPROD, Peage PROD, FL Prod)
B : Patch N marker (cycle) OU semaine NN (ligne data)
C : plage dates DD/MM/YYYY ... DD/MM/YYYY OU Gel
D : ferie (datetime) OU Gel OU texte
Structure cible table patch_planning :
year, week_number, week_code, week_start, week_end, cycle,
domain_code (FK domains), env_scope, status, note
Usage :
python tools/import_planning_xlsx.py [chemin_fichier.xlsx]
"""
import os
import sys
import re
import glob
from pathlib import Path
from datetime import date, datetime, timedelta
import openpyxl
from sqlalchemy import create_engine, text
ROOT = Path(__file__).resolve().parent.parent
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
or os.getenv("DATABASE_URL")
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
def parse_label(a):
"""Retourne liste (domain_code, env_scope) pour le libelle col A.
Un libelle peut mapper sur plusieurs domaines (ex BI + Gestion) ou
un scope combine (prod_pilot pour Peage HPROD / PROD Pilote).
"""
if not a:
return []
lo = a.lower()
if "bi" in lo and "gestion" in lo:
return [("BI", "all"), ("GESTION", "all")]
if "peage" in lo or "p\xe9age" in lo:
if "pilot" in lo:
return [("PEA", "prod_pilot")]
if "hprod" in lo:
return [("PEA", "hprod")]
if "prod" in lo:
return [("PEA", "prod")]
if "infrastructure" in lo:
if "hprod" in lo:
return [("INFRASTRUC", "hprod")]
return [("INFRASTRUC", "prod")]
if "trafic" in lo:
if "hprod" in lo:
return [("trafic", "hprod")]
return [("trafic", "prod")]
if lo.startswith("fl"):
if "pre-prod" in lo or "pr\xe9-prod" in lo or "preprod" in lo or "pr\xe9prod" in lo:
return [("FL", "pilot")]
if "prod" in lo and "pre" not in lo and "pr\xe9" not in lo:
return [("FL", "prod")]
return [("FL", "hprod")]
return []
def parse_dates(c_val, year):
"""Parse col C. Retourne (week_start, week_end, is_freeze)."""
if not c_val:
return None, None, False
s = str(c_val).strip()
if s.lower() == "gel":
return None, None, True
m = re.search(r"(\d{2})/(\d{2})/(\d{4}).*?(\d{2})/(\d{2})/(\d{4})", s)
if m:
d1 = date(int(m.group(3)), int(m.group(2)), int(m.group(1)))
d2 = date(int(m.group(6)), int(m.group(5)), int(m.group(4)))
return d1, d2, False
return None, None, False
def iso_week_dates(year, week):
"""Fallback : dates debut/fin semaine ISO depuis year+week."""
jan4 = date(year, 1, 4)
start = jan4 - timedelta(days=jan4.isoweekday() - 1) + timedelta(weeks=week - 1)
return start, start + timedelta(days=6)
def parse_note(d_val):
if d_val is None:
return None
if isinstance(d_val, (datetime, date)):
dd = d_val.date() if isinstance(d_val, datetime) else d_val
return f"Ferie : {dd.strftime('%d/%m/%Y')}"
s = str(d_val).strip()
if not s or s.lower() == "gel":
return None
return s
def parse_planning(xlsx_path, year_default=2026):
wb = openpyxl.load_workbook(xlsx_path, data_only=True)
if "Planning" not in wb.sheetnames:
raise SystemExit(f"[ERR] Sheet Planning introuvable. Sheets: {wb.sheetnames}")
ws = wb["Planning"]
rows = []
current_cycle = None
for row in ws.iter_rows(values_only=True):
a = row[0] if len(row) > 0 else None
b = row[1] if len(row) > 1 else None
c = row[2] if len(row) > 2 else None
d = row[3] if len(row) > 3 else None
if b and re.match(r"^\s*Patch\s+\d+\s*$", str(b), re.I):
m = re.search(r"\d+", str(b))
current_cycle = int(m.group(0)) if m else None
continue
if not b:
continue
m = re.match(r"^\s*semaine\s+(\d+)\s*$", str(b), re.I)
if not m:
continue
week_number = int(m.group(1))
year = year_default
week_code = f"S{week_number:02d}"
d1, d2, is_freeze = parse_dates(c, year)
if not d1:
d1, d2 = iso_week_dates(year, week_number)
note = parse_note(d)
if is_freeze and note is None:
note = "Gel"
if is_freeze:
status = "freeze"
else:
status = "open" if a else "empty"
targets = parse_label(a)
if not targets:
targets = [(None, "all")]
for dom, env in targets:
rows.append({
"year": year,
"week_number": week_number,
"week_code": week_code,
"week_start": d1,
"week_end": d2,
"cycle": current_cycle,
"domain_code": dom,
"env_scope": env,
"status": status,
"note": note,
})
return rows
SQL_INSERT = text("""
INSERT INTO patch_planning
(year, week_number, week_code, week_start, week_end, cycle,
domain_code, env_scope, status, note)
VALUES
(:year, :week_number, :week_code, :week_start, :week_end, :cycle,
:domain_code, :env_scope, :status, :note)
""")
def main():
if len(sys.argv) > 1:
xlsx = sys.argv[1]
else:
xlsx = None
for p in [
ROOT / "deploy" / "Planning Patching 2026_ayoub.xlsx",
ROOT / "deploy" / "Planning_Patching_2026_ayoub.xlsx",
]:
if p.exists():
xlsx = str(p)
break
if not xlsx:
candidates = glob.glob(str(ROOT / "deploy" / "*Planning*ayoub*.xlsx"))
xlsx = candidates[0] if candidates else None
if not xlsx or not os.path.exists(xlsx):
print("[ERR] Fichier Planning introuvable. Place-le dans deploy/ (ex: deploy/Planning Patching 2026_ayoub.xlsx)")
sys.exit(1)
print(f"[INFO] Fichier: {xlsx}")
rows = parse_planning(xlsx)
print(f"[INFO] Lignes parses: {len(rows)}")
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
inserted = 0
with engine.begin() as conn:
for r in rows:
conn.execute(SQL_INSERT, r)
inserted += 1
print(f"[OK] Termine - INSERT: {inserted}")
print("[INFO] Verifs :")
print(" SELECT week_code, domain_code, env_scope, status FROM patch_planning ORDER BY year, week_number, domain_code;")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,141 @@
"""Verifie + etablit les 3 liens : patch_history <-> users <-> contacts.
Contexte :
- patch_history.intervenant_name : texte libre venant du xlsx (ex "Khalid", "Mouaad")
- users.id : FK cible pour patch_history.intervenant_id
- users.contact_id : FK vers contacts.id
- contacts.ldap_dn : trace source AD
Matching : on tente d'apparier patch_history.intervenant_name a users.display_name
(ex "Khalid" -> "MOUTAOUAKIL-ext Khalid (admin)") en cherchant le prenom comme token.
Usage :
python tools/link_patch_history_intervenants.py # verif seule
python tools/link_patch_history_intervenants.py --apply # UPDATE FK
"""
import os
import sys
import argparse
from sqlalchemy import create_engine, text
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
or os.getenv("DATABASE_URL")
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
def report_state(conn):
print("\n=== ETAT ACTUEL DES 3 TABLES ===")
r = conn.execute(text("""
SELECT
(SELECT COUNT(*) FROM users) AS users_total,
(SELECT COUNT(*) FROM users WHERE auth_type='ldap') AS users_ldap,
(SELECT COUNT(*) FROM users WHERE contact_id IS NOT NULL) AS users_with_contact,
(SELECT COUNT(*) FROM contacts) AS contacts_total,
(SELECT COUNT(*) FROM contacts WHERE ldap_dn IS NOT NULL) AS contacts_with_ldap,
(SELECT COUNT(*) FROM patch_history) AS ph_total,
(SELECT COUNT(*) FROM patch_history WHERE intervenant_name IS NOT NULL) AS ph_with_name,
(SELECT COUNT(*) FROM patch_history WHERE intervenant_id IS NOT NULL) AS ph_with_user_fk
""")).fetchone()
print(f" users : total={r.users_total} | ldap={r.users_ldap} | lie contact={r.users_with_contact}")
print(f" contacts: total={r.contacts_total} | avec ldap_dn={r.contacts_with_ldap}")
print(f" patch_history : total={r.ph_total} | avec intervenant_name={r.ph_with_name} "
f"| avec intervenant_id (FK users)={r.ph_with_user_fk}")
print("\n=== DISTRIBUTION patch_history.intervenant_name ===")
for row in conn.execute(text("""
SELECT intervenant_name, COUNT(*) AS n
FROM patch_history WHERE intervenant_name IS NOT NULL
GROUP BY 1 ORDER BY 2 DESC
""")).fetchall():
print(f" {row.n:5d} {row.intervenant_name}")
print("\n=== USERS LDAP (candidats FK) ===")
for row in conn.execute(text("""
SELECT u.username, u.display_name, u.email, c.name AS contact_name,
CASE WHEN c.ldap_dn IS NOT NULL THEN 'LDAP' ELSE '-' END AS src
FROM users u LEFT JOIN contacts c ON u.contact_id=c.id
WHERE u.auth_type='ldap' ORDER BY u.username
""")).fetchall():
print(f" {row.username:15s} | {row.display_name or '-':45s} | {row.email:30s} | {row.src}")
def propose_mapping(conn):
"""Retourne dict {intervenant_name -> user_id} en matchant par prenom."""
users = conn.execute(text("""
SELECT id, username, display_name FROM users WHERE auth_type='ldap'
""")).fetchall()
names = conn.execute(text("""
SELECT DISTINCT intervenant_name FROM patch_history
WHERE intervenant_name IS NOT NULL
""")).fetchall()
mapping = {}
for name_row in names:
n = name_row.intervenant_name
if not n:
continue
n_lo = n.strip().lower()
# Matchs d'exclusion - collectives
if n_lo in ("secops", "secops-team", "secops team"):
continue
candidates = []
for u in users:
dn = (u.display_name or "").lower()
# Token match : "khalid" dans "moutaouakil-ext khalid (admin)"
if f" {n_lo} " in f" {dn} " or dn.endswith(f" {n_lo}") or dn.startswith(f"{n_lo} "):
candidates.append(u)
# Cas Joel : display_name peut contenir "Joël" avec accent
elif n_lo == "joel" and ("joël" in dn or "joel" in dn):
candidates.append(u)
if len(candidates) == 1:
mapping[n] = candidates[0].id
elif len(candidates) > 1:
print(f" [AMBIG] '{n}' matche {len(candidates)} users : {[c.username for c in candidates]}")
else:
print(f" [MISS] '{n}' -> aucun user LDAP trouve (peut-etre pas dans groupe secops)")
return mapping
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--apply", action="store_true",
help="Applique vraiment le UPDATE FK (par defaut : dry-run verif)")
args = parser.parse_args()
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
with engine.begin() as conn:
report_state(conn)
print("\n=== MATCHING intervenant_name -> users.id ===")
mapping = propose_mapping(conn)
print(f"\n {len(mapping)} correspondance(s) unique(s) trouvees :")
for name, uid in mapping.items():
u = conn.execute(text("SELECT display_name, username FROM users WHERE id=:i"),
{"i": uid}).fetchone()
print(f" '{name}' -> #{uid} {u.username} ({u.display_name})")
if not args.apply:
print("\n[DRY-RUN] Rien ecrit. Relance avec --apply pour UPDATE patch_history.intervenant_id")
return
print("\n=== APPLY : UPDATE patch_history.intervenant_id ===")
total_updated = 0
for name, uid in mapping.items():
r = conn.execute(text("""
UPDATE patch_history SET intervenant_id = :uid
WHERE intervenant_name = :name AND intervenant_id IS NULL
"""), {"uid": uid, "name": name})
print(f" '{name}' -> user #{uid} : {r.rowcount} lignes")
total_updated += r.rowcount
print(f"\n[OK] Total UPDATE : {total_updated} lignes")
# Re-verif apres
print("\n=== ETAT APRES APPLY ===")
report_state(conn)
if __name__ == "__main__":
main()