Compare commits

..

16 Commits

Author SHA1 Message Date
5fedfb5f80 Add page Tour de garde SecOps : import xlsx + table + vue hebdo + competences 2026-04-17 23:39:11 +00:00
803016458d Doc: algorithme detaille processus patching SANEF (12 sections) 2026-04-17 23:32:04 +00:00
9a72fa7eb7 Optim: fix N+1 queries itop_service (pre-load batch) + macros Jinja2 badges 2026-04-17 23:23:32 +00:00
a0f90cd719 Optim: logging structure + query_helpers.py + fix exceptions silencieuses routers 2026-04-17 23:19:18 +00:00
9097872e57 Secu: verif permissions can_view/can_edit sur endpoints HTMX detail/edit 2026-04-17 23:15:04 +00:00
89f069ddcc import_plan_patching: skip lignes avec date future (cellules coloriees a l avance) 2026-04-17 12:44:27 +00:00
2bf2fa5042 patch_history: tolere parametres vides dans les filtres (week=, source=, etc.) 2026-04-17 12:42:58 +00:00
402ed36407 import_ldap_group_users : ne reactive plus les users desactives manuellement 2026-04-17 12:32:46 +00:00
81227833c1 Add link_patch_history_intervenants : lie patch_history.intervenant_name -> users.id (FK) 2026-04-17 12:31:40 +00:00
7ec7c49c34 import_ldap_group_users : fallback UPN/sam@sanef.com si mail absent, inclut comptes admin sans mail 2026-04-17 12:26:12 +00:00
2a4c785535 Add import_ldap_group_users + FK users.contact_id + contacts.ldap_dn 2026-04-17 12:16:24 +00:00
2ab2ceabba Historique patching : filtres OS/zone/domaine/intervenant + colonnes table 2026-04-17 12:10:45 +00:00
14f809335e Add tool import_plan_patching_xlsx : historique 2025+2026 (vert = patche) 2026-04-17 11:56:32 +00:00
cfb9cf865c Register patch_history router in main.py 2026-04-17 09:41:14 +00:00
4b1794d4d1 Add page Historique patching : vue unifiee import xlsx + campagnes + quickwin 2026-04-17 08:47:25 +00:00
c9890a274f Add tools/import_planning_xlsx.py : import patch_planning depuis xlsx Ayoub 2026-04-17 08:32:56 +00:00
27 changed files with 2620 additions and 301 deletions

641
SANEF_PATCHING_PROCESS.md Normal file
View File

@ -0,0 +1,641 @@
# Processus de Patching SANEF — Algorithme détaillé
> Document technique extrait du code PatchCenter (avril 2026).
> Couvre le cycle complet : planification → exécution → validation → reporting.
---
## 1. Vue d'ensemble
```
PLANIFICATION → CAMPAGNE → PRÉREQUIS → CORRESPONDANCE → VALIDATION → EXÉCUTION → POST-PATCH → HISTORIQUE
S-2 S-1 S-1 continu J-1 J J+1 continu
```
**Acteurs** :
- Coordinateur : crée campagnes, assigne opérateurs, gère le planning
- Opérateur SecOps (6) : Khalid, Mouaad, Thierno, Paul, Joel, Ayoub
- Responsable applicatif : valide le post-patching non-prod
- DSI : consulte dashboard et rapports
**Règles métier** :
- Pas de patching vendredi (risque weekend)
- Fenêtre : 9h21h (27 créneaux de 15 min/jour)
- Budget : 35h/semaine pour ~20 serveurs
- Non-prod AVANT prod (validation obligatoire)
- Snapshot obligatoire avant patch (rollback possible)
- Gel : S01, S30, S34, S51S53 + jours fériés
- Trafic prod : interdit janviermars (déhivernage)
---
## 2. Planification annuelle
**Table** : `patch_planning`
**Source** : `Planning Patching 2026_ayoub.xlsx``tools/import_planning_xlsx.py`
### 2.1 Structure
| Champ | Description |
|-------|-------------|
| `year` | 2026 |
| `week_number` | 153 |
| `week_code` | S01S53 |
| `cycle` | 1 (janavr), 2 (avraoût), 3 (septdéc) |
| `domain_code` | FK → `domains.code` (INFRASTRUC, trafic, PEA, FL, BI, GESTION) |
| `env_scope` | prod, hprod, all, pilot, prod_pilot |
| `status` | open, freeze, holiday, empty |
### 2.2 Cycle type
```
Cycle 1 (Patch 1) : S02S15
S02: Infrastructure HPROD
S03: Trafic HPROD
S04: Trafic PROD
S05: Infrastructure PROD
S06S07: vide
S08: Péage HPROD/PROD Pilote
S09: Péage PROD
S10: vide
S11: FL Test/Recette/Dev
S12: FL Pré-Prod
S13: BI + Gestion
S14S15: FL Prod
Cycle 2 (Patch 2) : S16S35 (même rotation, décalé)
Cycle 3 (Patch 3) : S36S50 (même rotation, décalé)
```
### 2.3 Algorithme de validation planning
```
SI year < année_courante REJET
SI year == année_courante ET week < semaine_courante REJET
SI week == semaine_courante ET jour > mardi → REJET (trop tard)
week_start = lundi ISO de la semaine
week_end = dimanche
week_code = f"S{week:02d}"
INSERT INTO patch_planning (year, week_number, week_code, week_start, week_end,
cycle, domain_code, env_scope, status)
```
---
## 3. Création de campagne
**Tables** : `campaigns`, `patch_sessions`
**Route** : `POST /campaigns/create`
### 3.1 Sélection des serveurs éligibles
```sql
-- Critères d'éligibilité (tous obligatoires)
WHERE servers.os_family = 'linux'
AND servers.etat = 'Production'
AND servers.patch_os_owner = 'secops'
AND servers.licence_support IN ('active', 'els')
-- Filtre domaine/environnement selon le planning de la semaine
-- Si env_scope = 'prod' → environment.name = 'Production'
-- Si env_scope = 'hprod' → environment.name != 'Production'
-- Si env_scope = 'all' → tous les environnements du domaine
-- Si env_scope = 'prod_pilot'→ Production + Pilote
-- Si domain_code = 'DMZ' → inclut aussi zone = 'DMZ'
```
### 3.2 Partitionnement hprod / prod
```python
hprod_servers = [s for s in eligible if s.env_name != 'Production' and s.id not in excluded]
prod_servers = [s for s in eligible if s.env_name == 'Production' and s.id not in excluded]
# Tri par (app_group, hostname) pour grouper les applications
hprod_servers.sort(key=lambda s: (s.app_group or '', s.hostname))
prod_servers.sort(key=lambda s: (s.app_group or '', s.hostname))
```
### 3.3 Allocation des créneaux
```
27 créneaux/jour (15 min) :
Matin : 09h00, 09h15, ..., 12h00, 12h15 (13 slots)
Après-midi : 14h00, 14h15, ..., 16h15, 16h30 (14 slots)
Jours hprod : Lundi + Mardi → 54 créneaux max
Jours prod : Mercredi + Jeudi → 54 créneaux max
Pour chaque serveur :
jour = jours[slot_index // 27]
heure = DAILY_SLOTS[slot_index % 27]
SI pref_patch_jour != 'indifferent' → forcer ce jour
SI pref_patch_heure != 'indifferent' → forcer cette heure
SI default_intervenant_id existe → forcer cet opérateur (forced_assignment=true)
INSERT INTO patch_sessions (campaign_id, server_id, status='pending',
date_prevue, heure_prevue, intervenant_id, forced_assignment)
```
### 3.4 Assignation automatique des opérateurs
```
Règles par priorité (table default_assignments) :
1. Par serveur (rule_type='server', rule_value=hostname)
2. Par app_type (rule_type='app_type', rule_value=app_type)
3. Par app_group (rule_type='app_group', rule_value=app_group)
4. Par domaine (rule_type='domain', rule_value=domain_code)
5. Par zone (rule_type='zone', rule_value=zone_name)
Pour chaque règle (ordre priorité ASC) :
MATCH sessions non assignées → SET intervenant_id = rule.user_id
Auto-propagation app_group :
SI opérateur assigné à serveur avec app_group X :
→ Tous les autres serveurs app_group X dans la même campagne
reçoivent le même opérateur (propagation automatique)
```
### 3.5 Limites opérateurs
```sql
-- Table campaign_operator_limits (optionnel, par campagne)
-- Si max_servers > 0 ET count >= max_servers → refus d'assignation
SELECT COUNT(*) FROM patch_sessions
WHERE campaign_id = :cid AND intervenant_id = :uid AND status != 'excluded'
```
---
## 4. Vérification des prérequis
**Service** : `prereq_service.py`
**Route** : `POST /campaigns/{id}/check-prereqs`
### 4.1 Algorithme par serveur
```
POUR chaque session (status='pending') :
1. ÉLIGIBILITÉ
SI licence_support = 'obsolete' → EXCLURE (raison: EOL)
SI etat != 'Production' → EXCLURE (raison: non_patchable)
2. CONNECTIVITÉ TCP (port 22)
Résolution DNS avec suffixes :
"" → ".sanef.groupe" → ".sanef-rec.fr" → ".sanef.fr"
POUR chaque suffixe :
socket.connect(hostname+suffixe, port=22, timeout=5)
SI OK → prereq_ssh = 'ok', BREAK
SI aucun → prereq_ssh = 'ko' → EXCLURE (raison: creneau_inadequat)
3. MÉTHODE ROLLBACK
SI machine_type = 'vm' → rollback_method = 'snapshot'
SI machine_type = 'physical' → rollback_method = 'na'
4. VÉRIFICATIONS SSH (si clé disponible)
Connexion Paramiko avec clé /opt/patchcenter/keys/id_rsa_cybglobal.pem
a) Espace disque :
Commande : df -BM --output=target,avail | grep '^/ |^/var'
Seuils : / >= 1200 Mo, /var >= 800 Mo
SI KO → prereq_disk_ok = false
b) Satellite Red Hat :
Commande : subscription-manager identity
SI "not_registered" → prereq_satellite = 'ko'
SINON → prereq_satellite = 'ok'
5. RÉSULTAT
prereq_validated = (ssh=ok ET disk_ok != false ET eligible)
UPDATE patch_sessions SET
prereq_ssh, prereq_satellite, rollback_method,
prereq_disk_root_mb, prereq_disk_var_mb, prereq_disk_ok,
prereq_validated, prereq_date = now()
6. AUTO-EXCLUSION
SI prereq_ssh='ko' OU prereq_disk_ok=false OU licence='obsolete' :
UPDATE patch_sessions SET
status = 'excluded',
exclusion_reason = '...',
excluded_by = 'system',
excluded_at = now()
```
---
## 5. Correspondance prod ↔ non-prod
**Table** : `server_correspondance`
**Service** : `correspondance_service.py`
### 5.1 Détection automatique par signature hostname
```
Règle de nommage SANEF :
Position 1 : préfixe arbitraire
Position 2 : indicateur environnement
p, s → Production
r → Recette
t → Test
i, o → Pré-production
v → Validation
d → Développement
Position 3+ : suffixe applicatif
SIGNATURE = pos1 + "_" + pos3+
Exemple : "vpinfadns1" → signature "v_infadns1"
"vdinfadns1" → signature "v_infadns1" (même signature)
"vpinfadns1" = prod, "vdinfadns1" = dev → LIEN
Exceptions (pas d'auto-détection) :
- Hostnames commençant par "ls-" ou "sp"
POUR chaque signature :
prods = [serveurs avec env_char ∈ {p, s}]
nonprods = [serveurs avec env_char ∈ {r, t, i, v, d, o}]
SI 1 prod ET N nonprods :
→ Créer N liens (prod_server_id, nonprod_server_id, source='auto')
SI 0 prod : orphelins (pas de lien)
SI >1 prod : ambigu (skip)
```
### 5.2 Lien manuel
```sql
INSERT INTO server_correspondance
(prod_server_id, nonprod_server_id, environment_code, source, created_by)
VALUES (:prod_id, :nonprod_id, :env_code, 'manual', :user_id)
```
---
## 6. Validation post-patching
**Table** : `patch_validation`
**Route** : `/patching/validations`
### 6.1 Cycle de vie
```
┌──────────────────┐
Patching terminé → │ en_attente │ ← notification responsable
└────────┬─────────┘
┌──────────────┼──────────────┐
▼ ▼ ▼
validated_ok validated_ko forced
(OK, RAS) (problème détecté) (forcé par admin)
```
### 6.2 Règle de blocage prod
```python
def can_patch_prod(db, prod_server_id):
"""Le prod ne peut être patché que si TOUS ses non-prods liés sont validés."""
nonprods = SELECT nonprod_server_id FROM server_correspondance
WHERE prod_server_id = :prod_id
SI aucun lien → OK (orphelin, pas de blocage)
POUR chaque nonprod :
last_status = SELECT status FROM patch_validation
WHERE server_id = nonprod.id
ORDER BY patch_date DESC LIMIT 1
SI last_status NOT IN ('validated_ok', 'forced') :
→ BLOQUÉ (ce non-prod n'est pas validé)
RETOUR : (tous_validés, liste_bloqueurs)
```
---
## 7. Exécution — Mode Campagne Standard
**Table** : `patch_sessions`
### 7.1 Machine à états
```
pending
├─→ excluded (auto-exclusion prérequis OU exclusion manuelle)
├─→ prereq_ok (prérequis validés)
└─→ in_progress (opérateur démarre le patching)
├─→ patched (succès)
├─→ failed (échec)
└─→ reported (validé + reporté)
excluded → restaurable par admin
patched → terminal (crée patch_validation)
failed → terminal (investigation)
reported → terminal
```
### 7.2 Ordre d'exécution
```
Lundi → Hors-prod serveurs 127
Mardi → Hors-prod serveurs 2854
→ Notification responsables applicatifs
Mercredi → Validation non-prod (responsable valide OK/KO)
→ SI tous les non-prods OK → feu vert prod
Mercredi → Prod serveurs 127
Jeudi → Prod serveurs 2854
```
---
## 8. Exécution — Mode QuickWin (semi-automatique)
**Tables** : `quickwin_runs`, `quickwin_entries`, `quickwin_logs`
**Service** : `quickwin_service.py`, `quickwin_prereq_service.py`, `quickwin_snapshot_service.py`
### 8.1 Machine à états du run
```
draft → prereq → snapshot → patching → result → completed
↑ │
└──────────────── revert to draft ─────────────────┘
```
### 8.2 Phase 1 : Création du run
```python
# Entrée : year, week, label, server_ids
# Sortie : run_id + quickwin_entries
reboot_pkgs = get_secret("patching_reboot_packages")
# kernel*, glibc*, systemd*, dbus*, polkit*, linux-firmware*,
# microcode_ctl*, tuned*, dracut*, grub2*, kexec-tools*,
# libselinux*, selinux-policy*, shim*, mokutil*,
# net-snmp*, NetworkManager*, network-scripts*, nss*, openssl-libs*
POUR chaque server_id :
branch = "prod" si environment = 'Production' sinon "hprod"
INSERT INTO quickwin_entries
(run_id, server_id, branch, status='pending',
general_excludes=reboot_pkgs, specific_excludes=server.patch_excludes)
```
### 8.3 Phase 2 : Prérequis (SSE streaming)
```
POUR chaque entry (branch demandé, non exclu) :
1. RÉSOLUTION DNS
Détection environnement par 2ème caractère hostname :
prod/preprod : essayer sanef.groupe puis sanef-rec.fr
recette/test : essayer sanef-rec.fr puis sanef.groupe
socket.getaddrinfo(fqdn, 22) → SI résolu : OK
2. CONNEXION SSH (chaîne de fallback)
SI ssh_method = "ssh_psmp" :
ESSAYER : connexion PSMP (psmp.sanef.fr, user="CYBP01336@cybsecope@{fqdn}")
FALLBACK : connexion clé SSH directe
SINON :
ESSAYER : connexion clé SSH directe
FALLBACK : connexion PSMP
3. ESPACE DISQUE (via SSH)
sudo df / /var --output=target,pcent
SI usage >= 90% → disk_ok = false
4. SATELLITE / YUM
sudo subscription-manager status
OU sudo yum repolist
SI 0 repos → satellite_ok = false
5. RÉSULTAT
prereq_ok = dns AND ssh AND satellite AND disk
UPDATE quickwin_entries SET prereq_ok, prereq_detail, prereq_date
EMIT SSE event → affichage temps réel dans le navigateur
```
### 8.4 Phase 3 : Snapshots (VMs uniquement)
```
Ordre vCenter selon la branche :
hprod : Senlis (vpgesavcs1) → Nanterre (vpmetavcs1) → DR (vpsicavcs1)
prod : Nanterre → Senlis → DR
POUR chaque entry (prereq_ok = true) :
SI machine_type = 'physical' :
snap_done = true (pas de snapshot, vérifier backup Commvault)
CONTINUE
POUR chaque vCenter dans l'ordre :
Connexion pyVmomi → recherche VM par nom (vcenter_vm_name ou hostname)
SI trouvé :
snap_name = f"QW_{run_id}_{branch}_{YYYYMMDD_HHMM}"
Créer snapshot (memory=false, quiesce=true)
snap_done = true
BREAK
SI non trouvé sur aucun vCenter :
snap_done = false → LOG ERREUR
EMIT SSE event
```
### 8.5 Phase 4 : Patching (SSE streaming)
```
POUR chaque entry (snap_done = true, branch demandé) :
1. CONSTRUCTION COMMANDE YUM
excludes = parse(general_excludes + " " + specific_excludes)
args = " ".join(f"--exclude={pkg}" for pkg in excludes)
cmd = f"yum update -y {args}"
Exemple :
yum update -y --exclude=kernel* --exclude=glibc* --exclude=systemd*
2. EXÉCUTION SSH
Connexion SSH (même chaîne que prérequis)
stdin, stdout, stderr = client.exec_command(cmd, timeout=600)
output = stdout.read().decode('utf-8')
exit_code = stdout.channel.recv_exit_status()
3. ANALYSE SORTIE
Packages comptés : lignes "Updating", "Installing", "Upgrading"
Rien à faire : "Rien à faire" ou "Nothing to do"
Reboot requis : "kernel" ou "reboot" dans output
4. MISE À JOUR
status = "patched" si exit_code == 0 sinon "failed"
UPDATE quickwin_entries SET
status, patch_output, patch_packages_count,
patch_packages, reboot_required, patch_date
5. CRÉATION VALIDATION
SI status = "patched" :
INSERT INTO patch_validation
(server_id, campaign_id=run_id, campaign_type='quickwin',
patch_date=now(), status='en_attente')
EMIT SSE event {hostname, ok, packages, reboot, detail}
```
### 8.6 Phase 5 : Passage prod
```
AVANT de lancer le patching prod :
can_start_prod(db, run_id) :
SELECT COUNT(*) FROM quickwin_entries
WHERE run_id = :rid AND branch = 'hprod'
AND status IN ('pending', 'in_progress')
SI count > 0 → BLOQUÉ (hprod pas terminé)
check_prod_validations(db, run_id) :
POUR chaque entry prod :
Vérifier que tous les non-prods liés sont validated_ok/forced
SI blockers > 0 → BLOQUÉ (validation manquante)
```
---
## 9. Post-patching
### 9.1 Enregistrement historique
```sql
-- Après chaque patch (standard ou quickwin)
INSERT INTO patch_history
(server_id, campaign_id, intervenant_id, date_patch, status, notes, intervenant_name)
VALUES (:sid, :cid, :uid, now(), 'ok'/'ko', :notes, :interv_name)
```
### 9.2 Notifications
```
Déclencheurs :
- Début patching → notif_debut_sent = true
- Reboot effectué → notif_reboot_sent = true
- Fin patching → notif_fin_sent = true
Canal : Teams webhook (configurable dans settings)
```
### 9.3 Rollback
```
SI status = 'failed' ET rollback_method = 'snapshot' :
→ Restaurer snapshot vCenter (manuel ou via quickwin_snapshot_service)
→ Marquer rollback_justif dans patch_sessions
```
---
## 10. Reporting
### 10.1 Dashboard KPIs
| KPI | Requête |
|-----|---------|
| Serveurs patchés 2026 | `COUNT(DISTINCT server_id) FROM patch_history WHERE year=2026` |
| Events patching 2026 | `COUNT(*) FROM patch_history WHERE year=2026` |
| Jamais patchés (prod) | Serveurs Production + secops sans entrée patch_history cette année |
| Couverture % | patchés / patchables × 100 |
| Dernière semaine | `MAX(TO_CHAR(date_patch, 'IW'))` |
### 10.2 Historique (`/patching/historique`)
Sources unifiées :
- `patch_history` (imports xlsx + campagnes standard)
- `quickwin_entries` WHERE status='patched'
Filtres : année, semaine, OS, zone, domaine, intervenant, source, hostname
### 10.3 Intégrations
| Source | Usage |
|--------|-------|
| iTop | Serveurs, contacts, applications, domaines, environnements |
| Qualys | Assets, tags V3, agents, scans post-patch |
| CyberArk | Accès PSMP pour SSH sur serveurs restreints |
| Sentinel One | Agents endpoint (comparaison couverture) |
| AD SANEF | Groupe secops (8 users), auth LDAP |
---
## 11. Schéma de données
```
patch_planning ──→ campaigns ──→ patch_sessions ──→ patch_history
(annuel) (hebdo) (1 par serveur) (audit trail)
patch_validation
(non-prod → prod gate)
quickwin_runs ──→ quickwin_entries ────┘
(run) (1 par serveur)
quickwin_logs
(traces détaillées)
servers ──→ domain_environments ──→ domains
│ │ environments
│ │
├──→ zones
├──→ server_correspondance (prod ↔ non-prod)
├──→ server_ips
├──→ server_databases
├──→ qualys_assets ──→ qualys_asset_tags ──→ qualys_tags
└──→ applications (via application_id)
users ──→ contacts (via contact_id FK)
│ │
│ └──→ ldap_dn (source AD)
└──→ default_assignments (règles assignation)
└──→ campaign_operator_limits
```
---
## 12. Exclusions packages (yum --exclude)
### 12.1 Packages reboot (globaux)
```
kernel*, glibc*, systemd*, dbus*, polkit*, linux-firmware*,
microcode_ctl*, tuned*, dracut*, grub2*, kexec-tools*,
libselinux*, selinux-policy*, shim*, mokutil*,
net-snmp*, NetworkManager*, network-scripts*, nss*, openssl-libs*
```
Stockés dans `app_secrets['patching_reboot_packages']`.
Appliqués à `quickwin_entries.general_excludes` à la création du run.
### 12.2 Exclusions par serveur
Champ `servers.patch_excludes` (texte libre, séparé par espaces).
Gestion via `/patching/config-exclusions` (UI + bulk).
Synchronisé avec iTop (best-effort).
### 12.3 Commande générée
```bash
yum update -y --exclude=kernel* --exclude=glibc* --exclude=systemd* \
--exclude=<specific1> --exclude=<specific2>
```

View File

@ -1,3 +1,9 @@
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
"""PatchCenter v2 — Entry point FastAPI"""
from fastapi import FastAPI, Request
from fastapi.responses import RedirectResponse
@ -6,7 +12,7 @@ from starlette.middleware.base import BaseHTTPMiddleware
from .config import APP_NAME, APP_VERSION
from .dependencies import get_current_user, get_user_perms
from .database import SessionLocal, SessionLocalDemo
from .routers import auth, dashboard, servers, settings, users, campaigns, planning, specifics, audit, contacts, qualys, qualys_tags, quickwin, referentiel, patching, applications
from .routers import auth, dashboard, servers, settings, users, campaigns, planning, specifics, audit, contacts, qualys, qualys_tags, quickwin, referentiel, patching, applications, patch_history, duty
class PermissionsMiddleware(BaseHTTPMiddleware):
@ -64,6 +70,8 @@ app.include_router(qualys_tags.router)
app.include_router(quickwin.router)
app.include_router(referentiel.router)
app.include_router(patching.router)
app.include_router(patch_history.router)
app.include_router(duty.router)
app.include_router(applications.router)

View File

@ -117,6 +117,9 @@ async def audit_detail(request: Request, audit_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return HTMLResponse("<p>Non autorisé</p>")
from ..dependencies import get_user_perms, can_view
if not can_view(get_user_perms(db, user), "audit"):
return HTMLResponse("<p>Non autorisé</p>")
entry = db.execute(text("SELECT * FROM server_audit WHERE id = :id"),
{"id": audit_id}).fetchone()
if not entry:

View File

@ -1,3 +1,5 @@
import logging
logger = logging.getLogger(__name__)
"""Router campagnes — creation, prereqs, assignation, workflow"""
from datetime import datetime
from fastapi import APIRouter, Request, Depends, Query, Form

View File

@ -1,3 +1,5 @@
import logging
logger = logging.getLogger(__name__)
"""Router contacts — gestion des responsables applicatifs et scopes"""
from fastapi import APIRouter, Request, Depends, Query, Form
from fastapi.responses import HTMLResponse, RedirectResponse

View File

@ -1,3 +1,5 @@
import logging
logger = logging.getLogger(__name__)
from fastapi import APIRouter, Request, Depends
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates

57
app/routers/duty.py Normal file
View File

@ -0,0 +1,57 @@
"""Router Tour de garde SecOps"""
from fastapi import APIRouter, Request, Depends, Query
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy import text
from ..dependencies import get_db, get_current_user
from ..config import APP_NAME
router = APIRouter()
templates = Jinja2Templates(directory="app/templates")
@router.get("/duty", response_class=HTMLResponse)
async def duty_page(request: Request, db=Depends(get_db),
year: str = Query("")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
from datetime import datetime
year = int(year) if year and year.isdigit() else datetime.now().year
current_week = datetime.now().isocalendar()[1]
current_year = datetime.now().year
rows = db.execute(text("""
SELECT * FROM secops_duty
WHERE year = :y
ORDER BY week_number
"""), {"y": year}).fetchall()
years = db.execute(text("""
SELECT DISTINCT year FROM secops_duty ORDER BY year DESC
""")).fetchall()
# Competences (hardcoded pour l'instant, pourra etre en DB plus tard)
competences = [
{"nom": "Ayoub", "s1": True, "commvault": False, "m365": True, "symantec": True},
{"nom": "Mouaad", "s1": True, "commvault": False, "m365": True, "symantec": True},
{"nom": "Khalid", "s1": True, "commvault": False, "m365": True, "symantec": True},
{"nom": "Thierno", "s1": True, "commvault": True, "m365": True, "symantec": True},
{"nom": "Paul", "s1": True, "commvault": True, "m365": True, "symantec": True},
]
# Stats : qui a le plus de gardes
stats = {}
for r in rows:
for field in ["tdg_s1", "tdg_symantec", "tdg_m365", "tdg_dmz"]:
name = getattr(r, field, None)
if name:
stats[name] = stats.get(name, 0) + 1
return templates.TemplateResponse("duty.html", {
"request": request, "user": user, "app_name": APP_NAME,
"rows": rows, "year": year, "years": [y.year for y in years],
"current_week": current_week, "current_year": current_year,
"competences": competences, "stats": stats,
})

View File

@ -1,4 +1,4 @@
"""Router Historique patching — vue de patch_history"""
"""Router Historique patching — vue unifiee patch_history + quickwin_entries"""
from fastapi import APIRouter, Request, Depends, Query
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
@ -12,27 +12,50 @@ templates = Jinja2Templates(directory="app/templates")
@router.get("/patching/historique", response_class=HTMLResponse)
async def patch_history_page(request: Request, db=Depends(get_db),
year: int = Query(None), week: int = Query(None),
hostname: str = Query(None), page: int = Query(1)):
year: str = Query(""), week: str = Query(""),
hostname: str = Query(""), source: str = Query(""),
os_family: str = Query(""), zone: str = Query(""),
domain: str = Query(""), intervenant: str = Query(""),
page: str = Query("1")):
user = get_current_user(request)
if not user:
return RedirectResponse(url="/login")
from datetime import datetime
if not year:
year = datetime.now().year
year = int(year) if year and year.isdigit() else datetime.now().year
week = int(week) if week and week.isdigit() else None
page = int(page) if page and page.isdigit() else 1
hostname = hostname.strip() or None
source = source.strip() or None
os_family = os_family.strip() or None
zone = zone.strip() or None
domain = domain.strip() or None
intervenant = intervenant.strip() or None
per_page = 100
offset = (page - 1) * per_page
# KPIs
kpis = {}
kpis["total"] = db.execute(text(
kpis["total_ph"] = db.execute(text(
"SELECT COUNT(*) FROM patch_history WHERE EXTRACT(YEAR FROM date_patch)=:y"
), {"y": year}).scalar()
kpis["servers"] = db.execute(text(
"SELECT COUNT(DISTINCT server_id) FROM patch_history WHERE EXTRACT(YEAR FROM date_patch)=:y"
), {"y": year}).scalar()
kpis["total_qw"] = db.execute(text("""
SELECT COUNT(*) FROM quickwin_entries qe
JOIN quickwin_runs qr ON qe.run_id=qr.id
WHERE qe.status='patched' AND qr.year=:y
"""), {"y": year}).scalar()
kpis["total"] = kpis["total_ph"] + kpis["total_qw"]
kpis["servers"] = db.execute(text("""
SELECT COUNT(DISTINCT sid) FROM (
SELECT server_id AS sid FROM patch_history WHERE EXTRACT(YEAR FROM date_patch)=:y
UNION
SELECT qe.server_id FROM quickwin_entries qe
JOIN quickwin_runs qr ON qe.run_id=qr.id
WHERE qe.status='patched' AND qr.year=:y
) u
"""), {"y": year}).scalar()
kpis["patchables"] = db.execute(text(
"SELECT COUNT(*) FROM servers WHERE etat='Production' AND patch_os_owner='secops'"
)).scalar()
@ -41,56 +64,172 @@ async def patch_history_page(request: Request, db=Depends(get_db),
WHERE s.etat='Production' AND s.patch_os_owner='secops'
AND NOT EXISTS (SELECT 1 FROM patch_history ph
WHERE ph.server_id=s.id AND EXTRACT(YEAR FROM ph.date_patch)=:y)
AND NOT EXISTS (SELECT 1 FROM quickwin_entries qe
JOIN quickwin_runs qr ON qe.run_id=qr.id
WHERE qe.server_id=s.id AND qe.status='patched' AND qr.year=:y)
"""), {"y": year}).scalar()
kpis["coverage_pct"] = round((kpis["servers"] / kpis["patchables"] * 100), 1) if kpis["patchables"] else 0
# Par semaine
by_source = {}
by_source["import"] = db.execute(text(
"SELECT COUNT(*) FROM patch_history WHERE campaign_id IS NULL AND EXTRACT(YEAR FROM date_patch)=:y"
), {"y": year}).scalar()
by_source["standard"] = db.execute(text("""
SELECT COUNT(*) FROM patch_history ph
JOIN campaigns c ON ph.campaign_id=c.id
WHERE c.campaign_type='standard' AND EXTRACT(YEAR FROM ph.date_patch)=:y
"""), {"y": year}).scalar()
by_source["quickwin"] = kpis["total_qw"]
by_week = db.execute(text("""
SELECT TO_CHAR(date_patch, 'IW') as week_num,
COUNT(DISTINCT server_id) as servers
FROM patch_history
WHERE EXTRACT(YEAR FROM date_patch)=:y
GROUP BY TO_CHAR(date_patch, 'IW')
ORDER BY week_num
SELECT week_num, SUM(cnt)::int as servers FROM (
SELECT TO_CHAR(date_patch, 'IW') as week_num, COUNT(DISTINCT server_id) as cnt
FROM patch_history
WHERE EXTRACT(YEAR FROM date_patch)=:y
GROUP BY TO_CHAR(date_patch, 'IW')
UNION ALL
SELECT LPAD(qr.week_number::text, 2, '0') as week_num, COUNT(DISTINCT qe.server_id) as cnt
FROM quickwin_entries qe
JOIN quickwin_runs qr ON qe.run_id=qr.id
WHERE qe.status='patched' AND qr.year=:y
GROUP BY qr.week_number
) u GROUP BY week_num ORDER BY week_num
"""), {"y": year}).fetchall()
# Filtres
where = ["EXTRACT(YEAR FROM ph.date_patch)=:y"]
# Listes pour les filtres (selon annee courante)
filter_opts = {}
filter_opts["os"] = [r.os for r in db.execute(text("""
SELECT DISTINCT s.os_family as os FROM servers s
WHERE s.os_family IS NOT NULL AND s.os_family <> ''
ORDER BY 1
""")).fetchall()]
filter_opts["zones"] = [r.zone for r in db.execute(text("""
SELECT DISTINCT z.name as zone FROM zones z ORDER BY 1
""")).fetchall()]
filter_opts["domains"] = [r.dom for r in db.execute(text("""
SELECT DISTINCT d.name as dom FROM domains d ORDER BY 1
""")).fetchall()]
filter_opts["intervenants"] = [r.interv for r in db.execute(text("""
SELECT DISTINCT intervenant_name as interv FROM patch_history
WHERE intervenant_name IS NOT NULL AND intervenant_name <> ''
ORDER BY 1
""")).fetchall()]
where_ph = ["EXTRACT(YEAR FROM ph.date_patch)=:y"]
where_qw = ["qr.year=:y", "qe.status='patched'"]
params = {"y": year, "limit": per_page, "offset": offset}
if week:
where.append("EXTRACT(WEEK FROM ph.date_patch)=:wk")
where_ph.append("EXTRACT(WEEK FROM ph.date_patch)=:wk")
where_qw.append("qr.week_number=:wk")
params["wk"] = week
if hostname:
where.append("s.hostname ILIKE :h")
where_ph.append("s.hostname ILIKE :h")
where_qw.append("s.hostname ILIKE :h")
params["h"] = f"%{hostname}%"
wc = " AND ".join(where)
if os_family:
where_ph.append("s.os_family=:os")
where_qw.append("s.os_family=:os")
params["os"] = os_family
if zone:
where_ph.append("z.name=:zn")
where_qw.append("z.name=:zn")
params["zn"] = zone
if domain:
where_ph.append("d.name=:dm")
where_qw.append("d.name=:dm")
params["dm"] = domain
if intervenant:
where_ph.append("ph.intervenant_name=:iv")
where_qw.append("1=0") # quickwin n'a pas ce champ
params["iv"] = intervenant
if source == "import":
where_ph.append("ph.campaign_id IS NULL")
elif source == "standard":
where_ph.append("c.campaign_type='standard'")
total_filtered = db.execute(text(
f"SELECT COUNT(*) FROM patch_history ph JOIN servers s ON ph.server_id=s.id WHERE {wc}"
), params).scalar()
wc_ph = " AND ".join(where_ph)
wc_qw = " AND ".join(where_qw)
skip_qw = source in ("import", "standard") or bool(intervenant)
skip_ph = source == "quickwin"
ph_joins = """
JOIN servers s ON ph.server_id=s.id
LEFT JOIN zones z ON s.zone_id=z.id
LEFT JOIN domain_environments de ON s.domain_env_id=de.id
LEFT JOIN domains d ON de.domain_id=d.id
LEFT JOIN campaigns c ON ph.campaign_id=c.id
"""
qw_joins = """
JOIN quickwin_runs qr ON qe.run_id=qr.id
JOIN servers s ON qe.server_id=s.id
LEFT JOIN zones z ON s.zone_id=z.id
LEFT JOIN domain_environments de ON s.domain_env_id=de.id
LEFT JOIN domains d ON de.domain_id=d.id
"""
count_parts = []
if not skip_ph:
count_parts.append(f"SELECT COUNT(*) FROM patch_history ph {ph_joins} WHERE {wc_ph}")
if not skip_qw:
count_parts.append(f"SELECT COUNT(*) FROM quickwin_entries qe {qw_joins} WHERE {wc_qw}")
count_sql = " + ".join(f"({p})" for p in count_parts) if count_parts else "0"
total_filtered = db.execute(text(f"SELECT {count_sql}"), params).scalar()
union_parts = []
if not skip_ph:
union_parts.append(f"""
SELECT s.id as sid, s.hostname, s.os_family, s.etat,
ph.date_patch, ph.status, ph.notes, ph.intervenant_name,
z.name as zone, d.name as domain_name,
CASE WHEN ph.campaign_id IS NULL THEN 'import'
ELSE COALESCE(c.campaign_type, 'standard') END as source_type,
c.id as campaign_id, c.label as campaign_label,
NULL::int as run_id, NULL::text as run_label
FROM patch_history ph {ph_joins}
WHERE {wc_ph}
""")
if not skip_qw:
union_parts.append(f"""
SELECT s.id as sid, s.hostname, s.os_family, s.etat,
qe.patch_date as date_patch, qe.status, qe.notes,
NULL::text as intervenant_name,
z.name as zone, d.name as domain_name,
'quickwin' as source_type,
NULL::int as campaign_id, NULL::text as campaign_label,
qr.id as run_id, qr.label as run_label
FROM quickwin_entries qe {qw_joins}
WHERE {wc_qw}
""")
if not union_parts:
union_parts.append("""SELECT NULL::int as sid, NULL as hostname, NULL as os_family, NULL as etat,
NULL::timestamptz as date_patch, NULL as status, NULL as notes, NULL as intervenant_name,
NULL as zone, NULL as domain_name, NULL as source_type,
NULL::int as campaign_id, NULL as campaign_label, NULL::int as run_id, NULL as run_label
WHERE 1=0""")
union_sql = " UNION ALL ".join(union_parts)
rows = db.execute(text(f"""
SELECT s.id as sid, s.hostname, s.os_family, s.etat,
ph.date_patch, ph.status, ph.notes,
z.name as zone
FROM patch_history ph
JOIN servers s ON ph.server_id = s.id
LEFT JOIN zones z ON s.zone_id = z.id
WHERE {wc}
ORDER BY ph.date_patch DESC
SELECT * FROM ({union_sql}) combined
ORDER BY date_patch DESC NULLS LAST
LIMIT :limit OFFSET :offset
"""), params).fetchall()
# Années dispo
years = db.execute(text("""
SELECT DISTINCT EXTRACT(YEAR FROM date_patch)::int as y
FROM patch_history ORDER BY y DESC
SELECT DISTINCT y FROM (
SELECT EXTRACT(YEAR FROM date_patch)::int as y FROM patch_history
UNION
SELECT year as y FROM quickwin_runs
) u ORDER BY y DESC
""")).fetchall()
return templates.TemplateResponse("patch_history.html", {
"request": request, "user": user, "app_name": APP_NAME,
"kpis": kpis, "by_week": by_week, "rows": rows,
"year": year, "week": week, "hostname": hostname,
"page": page, "per_page": per_page, "total_filtered": total_filtered,
"years": [y.y for y in years],
"kpis": kpis, "by_week": by_week, "by_source": by_source,
"rows": rows, "year": year, "week": week, "hostname": hostname,
"source": source, "os_family": os_family, "zone": zone,
"domain": domain, "intervenant": intervenant,
"filter_opts": filter_opts, "page": page, "per_page": per_page,
"total_filtered": total_filtered, "years": [y.y for y in years],
})

View File

@ -1,3 +1,5 @@
import logging
logger = logging.getLogger(__name__)
"""Router serveurs — CRUD + detail + edit via HTMX"""
from fastapi import APIRouter, Request, Depends, Query, Form
from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse
@ -98,6 +100,9 @@ async def server_detail(request: Request, server_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return HTMLResponse("<p>Non autorise</p>")
from ..dependencies import get_user_perms, can_view
if not can_view(get_user_perms(db, user), "servers"):
return HTMLResponse("<p>Non autorise</p>")
s = get_server_full(db, server_id)
if not s:
return HTMLResponse("<p>Serveur non trouve</p>")
@ -115,6 +120,9 @@ async def server_edit(request: Request, server_id: int, db=Depends(get_db)):
user = get_current_user(request)
if not user:
return HTMLResponse("<p>Non autorise</p>")
from ..dependencies import get_user_perms, can_edit
if not can_edit(get_user_perms(db, user), "servers"):
return HTMLResponse("<p>Non autorise</p>")
s = get_server_full(db, server_id)
if not s:
return HTMLResponse("<p>Serveur non trouve</p>")
@ -199,8 +207,8 @@ async def server_update(request: Request, server_id: int, db=Depends(get_db),
vm_id = list(r["objects"].values())[0]["key"]
new_list = [{"applicationsolution_id": int(app_itop_id)}] if app_itop_id else []
client.update("VirtualMachine", vm_id, {"applicationsolution_list": new_list})
except Exception:
pass
except Exception as e:
logger.warning(f"Erreur non bloquante: {e}")
s = get_server_full(db, server_id)
tags = get_server_tags(db, s.qid)

View File

@ -1,3 +1,5 @@
import logging
logger = logging.getLogger(__name__)
"""Router settings — configuration modules externes + connexions"""
from fastapi import APIRouter, Request, Depends, Form
from fastapi.responses import HTMLResponse, RedirectResponse

View File

@ -144,63 +144,63 @@ def sync_from_itop(db, itop_url, itop_user, itop_pass):
except Exception:
pass
# ─── 1. Typologies: Environnements ───
# ─── 1. Typologies: Environnements (batch) ───
existing_envs = {r.name.lower() for r in db.execute(text("SELECT name FROM environments")).fetchall()}
for item in client.get_all("Environnement", "name"):
name = item.get("name", "")
if not name:
if not name or name.lower() in existing_envs:
continue
existing = db.execute(text("SELECT id FROM environments WHERE LOWER(name)=LOWER(:n)"), {"n": name}).fetchone()
if not existing:
try:
db.execute(text("INSERT INTO environments (name, code) VALUES (:n, :c)"),
{"n": name, "c": name[:10].upper().replace(" ", "").replace("-", "")})
db.commit()
stats["environments"] += 1
except Exception:
db.rollback()
try:
db.execute(text("INSERT INTO environments (name, code) VALUES (:n, :c)"),
{"n": name, "c": name[:10].upper().replace(" ", "").replace("-", "")})
existing_envs.add(name.lower())
stats["environments"] += 1
except Exception:
db.rollback()
db.commit()
# ─── 2. Typologies: Domaines applicatifs ───
# ─── 2. Typologies: Domaines applicatifs (batch) ───
existing_doms = {r.name.lower() for r in db.execute(text("SELECT name FROM domains")).fetchall()}
for item in client.get_all("DomaineApplicatif", "name"):
name = item.get("name", "")
if not name:
if not name or name.lower() in existing_doms:
continue
existing = db.execute(text("SELECT id FROM domains WHERE LOWER(name)=LOWER(:n)"), {"n": name}).fetchone()
if not existing:
try:
db.execute(text("INSERT INTO domains (name, code) VALUES (:n, :c)"),
{"n": name, "c": name[:10].upper().replace(" ", "")})
db.commit()
stats["domains"] += 1
except Exception:
db.rollback()
try:
db.execute(text("INSERT INTO domains (name, code) VALUES (:n, :c)"),
{"n": name, "c": name[:10].upper().replace(" ", "")})
existing_doms.add(name.lower())
stats["domains"] += 1
except Exception:
db.rollback()
db.commit()
# ─── 3. Typologies: Zones ───
# ─── 3. Typologies: Zones (batch) ───
existing_zones = {r.name.lower() for r in db.execute(text("SELECT name FROM zones")).fetchall()}
for item in client.get_all("Zone", "name"):
name = item.get("name", "")
if not name:
if not name or name.lower() in existing_zones:
continue
existing = db.execute(text("SELECT id FROM zones WHERE LOWER(name)=LOWER(:n)"), {"n": name}).fetchone()
if not existing:
try:
db.execute(text("INSERT INTO zones (name, is_dmz) VALUES (:n, :d)"),
{"n": name, "d": "dmz" in name.lower()})
db.commit()
stats["zones"] += 1
except Exception:
db.rollback()
try:
db.execute(text("INSERT INTO zones (name, is_dmz) VALUES (:n, :d)"),
{"n": name, "d": "dmz" in name.lower()})
existing_zones.add(name.lower())
stats["zones"] += 1
except Exception:
db.rollback()
db.commit()
# ─── 4. Typologies: DomainLdap → domain_ltd_list ───
# ─── 4. Typologies: DomainLdap → domain_ltd_list (batch) ───
existing_ltd = {r.name.lower() for r in db.execute(text("SELECT name FROM domain_ltd_list")).fetchall()}
for item in client.get_all("DomainLdap", "name"):
name = item.get("name", "")
if not name:
if not name or name.lower() in existing_ltd:
continue
existing = db.execute(text("SELECT id FROM domain_ltd_list WHERE LOWER(name)=LOWER(:n)"), {"n": name}).fetchone()
if not existing:
try:
db.execute(text("INSERT INTO domain_ltd_list (name) VALUES (:n)"), {"n": name})
db.commit()
except Exception:
db.rollback()
try:
db.execute(text("INSERT INTO domain_ltd_list (name) VALUES (:n)"), {"n": name})
existing_ltd.add(name.lower())
except Exception:
db.rollback()
db.commit()
# ─── 5. Contacts + Teams (filtre périmètre IT uniquement) ───
persons = client.get_all("Person", "name,first_name,email,phone,org_name,function,status")
@ -233,6 +233,9 @@ def sync_from_itop(db, itop_url, itop_user, itop_pass):
seen_emails = set()
stats["contacts_deactivated"] = 0
contacts_by_email = {}
for r in db.execute(text("SELECT id, email FROM contacts")).fetchall():
contacts_by_email[r.email.lower()] = r
for p in persons:
fullname = f"{p.get('first_name','')} {p.get('name','')}".strip()
email = p.get("email", "")
@ -254,7 +257,7 @@ def sync_from_itop(db, itop_url, itop_user, itop_pass):
seen_itop_ids.add(int(itop_id))
seen_emails.add(email.lower())
existing = db.execute(text("SELECT id FROM contacts WHERE LOWER(email)=LOWER(:e)"), {"e": email}).fetchone()
existing = contacts_by_email.get(email.lower())
if existing:
db.execute(text("""UPDATE contacts SET name=:n, role=:r, itop_id=:iid,
telephone=:tel, team=:t, function=:f, is_active=:a, updated_at=NOW() WHERE id=:id"""),
@ -292,6 +295,9 @@ def sync_from_itop(db, itop_url, itop_user, itop_pass):
# Person name → email lookup
person_email = {}
contacts_by_email = {}
for r in db.execute(text("SELECT id, email FROM contacts")).fetchall():
contacts_by_email[r.email.lower()] = r
for p in persons:
fullname = f"{p.get('first_name','')} {p.get('name','')}".strip()
person_email[fullname.lower()] = p.get("email", "")

View File

@ -0,0 +1,32 @@
"""Helpers SQL reutilisables pour eviter la duplication de JOINs."""
from sqlalchemy import text
SERVER_BASE_JOINS = """
LEFT JOIN domain_environments de ON s.domain_env_id = de.id
LEFT JOIN domains d ON de.domain_id = d.id
LEFT JOIN environments e ON de.environment_id = e.id
LEFT JOIN zones z ON s.zone_id = z.id
"""
SERVER_BASE_COLS = """
s.id, s.hostname, s.fqdn, s.os_family, s.os_version,
s.etat, s.tier, s.environnement, s.machine_type,
d.name as domaine, d.code as domain_code,
e.name as env_name, e.code as env_code,
z.name as zone
"""
def server_list_query(where="1=1", order="s.hostname", limit=None, offset=None):
"""Construit une requete SELECT serveurs avec JOINs standard."""
q = f"""
SELECT {SERVER_BASE_COLS}
FROM servers s
{SERVER_BASE_JOINS}
WHERE {where}
ORDER BY {order}
"""
if limit is not None:
q += f" LIMIT {int(limit)}"
if offset is not None:
q += f" OFFSET {int(offset)}"
return text(q)

View File

@ -89,6 +89,8 @@
{% if p.campaigns %}<a href="/campaigns" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if 'campaigns' in path and 'assignments' not in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Campagnes</a>{% endif %}
{% if p.servers in ('edit','admin') or p.campaigns in ('edit','admin') or p.quickwin in ('edit','admin') %}<a href="/patching/config-exclusions" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if 'config-exclusions' in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Config exclusions</a>{% endif %}
{% if p.campaigns in ('edit','admin') or p.quickwin in ('edit','admin') %}<a href="/patching/validations" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if '/patching/validations' in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Validations</a>{% endif %}
<a href="/patching/historique" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if '/patching/historique' in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Historique</a>
<a href="/duty" class="block px-3 py-1.5 rounded-md text-xs hover:bg-cyber-border/30 {% if '/duty' in path %}bg-cyber-border/30 text-cyber-accent{% else %}text-gray-400{% endif %} pl-6">Tour de garde</a>
{# Quickwin sous-groupe #}
{% if p.campaigns or p.quickwin %}

125
app/templates/duty.html Normal file
View File

@ -0,0 +1,125 @@
{% extends 'base.html' %}
{% block title %}Tour de garde SecOps{% endblock %}
{% block content %}
<div class="flex justify-between items-center mb-4">
<div>
<h2 class="text-xl font-bold text-cyber-accent">Tour de garde SecOps</h2>
<p class="text-xs text-gray-500 mt-1">Planning hebdomadaire des astreintes et responsabilites.</p>
</div>
<div class="flex gap-2">
{% for y in years %}<a href="?year={{ y }}" class="btn-sm {% if y == year %}bg-cyber-accent text-black{% else %}bg-cyber-border text-gray-300{% endif %} px-3 py-1 text-xs">{{ y }}</a>{% endfor %}
</div>
</div>
<!-- Semaine en cours -->
{% for r in rows %}
{% if r.week_number == current_week and year == current_year %}
<div class="card p-4 mb-4" style="border-left:4px solid #00ff88;">
<div class="flex justify-between items-center mb-2">
<h3 class="text-sm font-bold text-cyber-green">Cette semaine — {{ r.week_code }}</h3>
<span class="text-xs text-gray-500">{{ r.week_start.strftime('%d/%m') if r.week_start else '' }} → {{ r.week_end.strftime('%d/%m/%Y') if r.week_end else '' }}</span>
</div>
{% if r.absences %}<p class="text-xs text-cyber-yellow mb-2">Absences : {{ r.absences }}</p>{% endif %}
<div style="display:flex;flex-wrap:wrap;gap:8px;">
<div class="card p-2 text-center" style="flex:1;min-width:0;background:#111827">
<div class="text-sm font-bold text-cyan-400">{{ r.tdg_s1 or '-' }}</div>
<div style="font-size:10px" class="text-gray-500">Sentinel One</div>
</div>
<div class="card p-2 text-center" style="flex:1;min-width:0;background:#111827">
<div class="text-sm font-bold text-purple-400">{{ r.tdg_symantec or '-' }}</div>
<div style="font-size:10px" class="text-gray-500">Symantec</div>
</div>
<div class="card p-2 text-center" style="flex:1;min-width:0;background:#111827">
<div class="text-sm font-bold text-blue-400">{{ r.tdg_m365 or '-' }}</div>
<div style="font-size:10px" class="text-gray-500">M365</div>
</div>
<div class="card p-2 text-center" style="flex:1;min-width:0;background:#111827">
<div class="text-sm font-bold text-green-400">{{ r.tdg_commvault or '-' }}</div>
<div style="font-size:10px" class="text-gray-500">Commvault</div>
</div>
<div class="card p-2 text-center" style="flex:1;min-width:0;background:#111827">
<div class="text-sm font-bold text-red-400">{{ r.tdg_dmz or '-' }}</div>
<div style="font-size:10px" class="text-gray-500">DMZ</div>
</div>
<div class="card p-2 text-center" style="flex:1;min-width:0;background:#111827">
<div class="text-sm font-bold text-yellow-400">{{ r.tdg_meteo or '-' }}</div>
<div style="font-size:10px" class="text-gray-500">Meteo</div>
</div>
<div class="card p-2 text-center" style="flex:1;min-width:0;background:#111827">
<div class="text-sm font-bold text-orange-400">{{ r.tdg_incident_majeur or '-' }}</div>
<div style="font-size:10px" class="text-gray-500">Incident Maj.</div>
</div>
</div>
</div>
{% endif %}
{% endfor %}
<!-- Competences -->
<div class="card p-4 mb-4">
<h3 class="text-sm font-bold text-cyber-accent mb-3">Matrice competences</h3>
<table class="w-full table-cyber text-xs">
<thead><tr>
<th class="p-2 text-left">Nom</th>
<th class="p-2 text-center">S1</th>
<th class="p-2 text-center">Commvault</th>
<th class="p-2 text-center">M365</th>
<th class="p-2 text-center">Symantec</th>
<th class="p-2 text-center">Gardes {{ year }}</th>
</tr></thead>
<tbody>
{% for c in competences %}
<tr class="border-t border-cyber-border/30">
<td class="p-2 font-bold text-cyber-accent">{{ c.nom }}</td>
<td class="p-2 text-center">{% if c.s1 %}<span class="text-cyber-green"></span>{% else %}<span class="text-gray-600"></span>{% endif %}</td>
<td class="p-2 text-center">{% if c.commvault %}<span class="text-cyber-green"></span>{% else %}<span class="text-gray-600"></span>{% endif %}</td>
<td class="p-2 text-center">{% if c.m365 %}<span class="text-cyber-green"></span>{% else %}<span class="text-gray-600"></span>{% endif %}</td>
<td class="p-2 text-center">{% if c.symantec %}<span class="text-cyber-green"></span>{% else %}<span class="text-gray-600"></span>{% endif %}</td>
<td class="p-2 text-center text-cyber-accent font-bold">{{ stats.get(c.nom, 0) }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
<!-- Tableau complet -->
<div class="card overflow-x-auto">
<table class="w-full table-cyber text-xs">
<thead><tr>
<th class="p-2 text-center" style="min-width:40px">Sem.</th>
<th class="p-2 text-left" style="min-width:100px">Dates</th>
<th class="p-2 text-left" style="min-width:120px">Absences</th>
<th class="p-2 text-center">S1</th>
<th class="p-2 text-center">Symantec</th>
<th class="p-2 text-center">M365</th>
<th class="p-2 text-center">Commvault</th>
<th class="p-2 text-center">Meteo</th>
<th class="p-2 text-center">DMZ</th>
<th class="p-2 text-center">SafeNet</th>
<th class="p-2 text-center">Quarant.</th>
<th class="p-2 text-center">Inc. Maj.</th>
</tr></thead>
<tbody>
{% for r in rows %}
<tr class="border-t border-cyber-border/30 {% if r.week_number == current_week and year == current_year %}bg-cyber-accent/10{% endif %}" {% if r.week_number == current_week and year == current_year %}id="current"{% endif %}>
<td class="p-2 text-center font-bold {% if r.week_number == current_week and year == current_year %}text-cyber-green{% else %}text-gray-400{% endif %}">{{ r.week_code }}</td>
<td class="p-2 text-gray-400" style="font-size:10px">{{ r.week_start.strftime('%d/%m') if r.week_start else '' }} → {{ r.week_end.strftime('%d/%m') if r.week_end else '' }}</td>
<td class="p-2 text-cyber-yellow" style="font-size:10px;max-width:150px;overflow:hidden;text-overflow:ellipsis;white-space:nowrap" title="{{ r.absences or '' }}">{{ r.absences or '' }}</td>
<td class="p-2 text-center text-cyan-400">{{ r.tdg_s1 or '-' }}</td>
<td class="p-2 text-center text-purple-400">{{ r.tdg_symantec or '-' }}</td>
<td class="p-2 text-center text-blue-400">{{ r.tdg_m365 or '-' }}</td>
<td class="p-2 text-center text-green-400">{{ r.tdg_commvault or '-' }}</td>
<td class="p-2 text-center text-yellow-400">{{ r.tdg_meteo or '-' }}</td>
<td class="p-2 text-center text-red-400">{{ r.tdg_dmz or '-' }}</td>
<td class="p-2 text-center text-gray-300">{{ r.tdg_safenet or '-' }}</td>
<td class="p-2 text-center text-gray-300">{{ r.tdg_quarantaine or '-' }}</td>
<td class="p-2 text-center text-orange-400">{{ r.tdg_incident_majeur or '-' }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
<script>
document.getElementById('current')?.scrollIntoView({behavior:'smooth', block:'center'});
</script>
{% endblock %}

33
app/templates/macros.html Normal file
View File

@ -0,0 +1,33 @@
{% macro badge_etat(etat) -%}
<span class="badge {% if etat == 'Production' %}badge-green{% elif etat == 'Recette' %}badge-yellow{% elif etat == 'Decommissioned' %}badge-red{% else %}badge-gray{% endif %}" title="{{ etat or '' }}">{{ (etat or '-')[:6] }}</span>
{%- endmacro %}
{% macro badge_env(env) -%}
<span class="badge {% if env == 'Production' %}badge-green{% elif env == 'Recette' %}badge-yellow{% else %}badge-gray{% endif %}" title="{{ env or '' }}">{{ (env or '-')[:6] }}</span>
{%- endmacro %}
{% macro badge_os(os) -%}
<span class="text-gray-400">{{ (os or '-')[:6] }}</span>
{%- endmacro %}
{% macro badge_zone(zone) -%}
<span class="badge {% if zone == 'DMZ' %}badge-red{% else %}badge-gray{% endif %}">{{ zone or '-' }}</span>
{%- endmacro %}
{% macro badge_status(status) -%}
<span class="badge {% if status in ('ok','patched','validated_ok','active','open') %}badge-green{% elif status in ('ko','failed','validated_ko','closed','freeze') %}badge-red{% elif status in ('en_attente','pending','wip','in_progress') %}badge-yellow{% else %}badge-gray{% endif %}">{{ status or '-' }}</span>
{%- endmacro %}
{% macro badge_source(source_type, campaign_id=None, campaign_label=None, run_id=None, run_label=None) -%}
{% if source_type == 'import' %}<span class="badge" style="background:#1e3a5f;color:#60a5fa;">xlsx</span>
{% elif source_type == 'standard' %}<a href="/campaigns/{{ campaign_id }}" class="badge" style="background:#164e63;color:#22d3ee;text-decoration:none">{{ campaign_label or 'Campagne' }}</a>
{% elif source_type == 'quickwin' %}<a href="/quickwin/{{ run_id }}" class="badge" style="background:#3b1f5e;color:#c084fc;text-decoration:none">{{ run_label or 'QuickWin' }}</a>
{% else %}<span class="badge badge-gray">{{ source_type or '?' }}</span>{% endif %}
{%- endmacro %}
{% macro filter_select(name, label, options, selected) -%}
<select name="{{ name }}" class="text-xs py-1 px-2">
<option value="">{{ label }}</option>
{% for o in options %}<option value="{{ o }}" {% if selected == o %}selected{% endif %}>{{ o }}</option>{% endfor %}
</select>
{%- endmacro %}

View File

@ -0,0 +1,160 @@
{% extends 'base.html' %}
{% block title %}Historique patching{% endblock %}
{% block content %}
<div class="flex justify-between items-center mb-4">
<div>
<h2 class="text-xl font-bold text-cyber-accent">Historique patching</h2>
<p class="text-xs text-gray-500 mt-1">Vue unifiée : imports xlsx + campagnes standard + QuickWin.</p>
</div>
<div class="flex gap-2">
{% for y in years %}<a href="?year={{ y }}" class="btn-sm {% if y == year %}bg-cyber-accent text-black{% else %}bg-cyber-border text-gray-300{% endif %} px-3 py-1 text-xs">{{ y }}</a>{% endfor %}
</div>
</div>
<!-- KPIs -->
<div style="display:flex;flex-wrap:wrap;gap:8px;margin-bottom:16px;">
<div class="card p-3 text-center" style="flex:1;min-width:0">
<div class="text-2xl font-bold text-cyber-accent">{{ kpis.total }}</div>
<div class="text-xs text-gray-500">Events {{ year }}</div>
</div>
<div class="card p-3 text-center" style="flex:1;min-width:0">
<div class="text-2xl font-bold text-cyber-green">{{ kpis.servers }}</div>
<div class="text-xs text-gray-500">Serveurs distincts</div>
</div>
<div class="card p-3 text-center" style="flex:1;min-width:0">
<div class="text-2xl font-bold text-white">{{ kpis.patchables }}</div>
<div class="text-xs text-gray-500">Patchables SecOps</div>
</div>
<div class="card p-3 text-center" style="flex:1;min-width:0">
<div class="text-2xl font-bold {% if kpis.never > 0 %}text-cyber-red{% else %}text-cyber-green{% endif %}">{{ kpis.never }}</div>
<div class="text-xs text-gray-500">Jamais patchés {{ year }}</div>
</div>
<div class="card p-3 text-center" style="flex:1;min-width:0">
<div class="text-2xl font-bold {% if kpis.coverage_pct >= 80 %}text-cyber-green{% elif kpis.coverage_pct >= 50 %}text-cyber-yellow{% else %}text-cyber-red{% endif %}">{{ kpis.coverage_pct }}%</div>
<div class="text-xs text-gray-500">Couverture</div>
</div>
</div>
<!-- Répartition par source -->
<div style="display:flex;flex-wrap:wrap;gap:8px;margin-bottom:16px;">
<a href="?year={{ year }}&source=import" class="card p-3 text-center hover:border-cyber-accent" style="flex:1;min-width:0">
<div class="text-2xl font-bold text-blue-400">{{ by_source.import }}</div>
<div class="text-xs text-gray-500">Import xlsx</div>
</a>
<a href="?year={{ year }}&source=standard" class="card p-3 text-center hover:border-cyber-accent" style="flex:1;min-width:0">
<div class="text-2xl font-bold text-cyan-400">{{ by_source.standard }}</div>
<div class="text-xs text-gray-500">Campagnes standard</div>
</a>
<a href="?year={{ year }}&source=quickwin" class="card p-3 text-center hover:border-cyber-accent" style="flex:1;min-width:0">
<div class="text-2xl font-bold text-purple-400">{{ by_source.quickwin }}</div>
<div class="text-xs text-gray-500">QuickWin</div>
</a>
</div>
<!-- Graphique par semaine -->
{% if by_week %}
<div class="card p-4 mb-4">
<h3 class="text-sm font-bold text-cyber-accent mb-3">Serveurs patchés par semaine ({{ year }})</h3>
<div style="display:flex;align-items:flex-end;gap:2px;height:120px;">
{% set max_val = by_week|map(attribute='servers')|max %}
{% for w in by_week %}
<a href="?year={{ year }}&week={{ w.week_num|int }}" title="S{{ w.week_num }} : {{ w.servers }} serveur(s)" style="flex:1;display:flex;flex-direction:column;align-items:center;min-width:0;">
<div style="width:100%;background:{% if week and week == w.week_num|int %}#00ff88{% elif w.servers >= 30 %}#06b6d4{% elif w.servers >= 15 %}#0e7490{% else %}#164e63{% endif %};border-radius:2px 2px 0 0;height:{{ (w.servers / max_val * 100)|int if max_val else 0 }}px;min-height:2px;"></div>
<span style="font-size:8px;color:#6b7280;margin-top:2px;">{{ w.week_num }}</span>
</a>
{% endfor %}
</div>
</div>
{% endif %}
<!-- Filtres -->
<div class="card p-3 mb-4">
<form method="GET" class="flex gap-2 items-center flex-wrap">
<input type="hidden" name="year" value="{{ year }}">
<select name="week" class="text-xs py-1 px-2">
<option value="">Toutes semaines</option>
{% for w in by_week %}<option value="{{ w.week_num|int }}" {% if week == w.week_num|int %}selected{% endif %}>S{{ w.week_num }} ({{ w.servers }})</option>{% endfor %}
</select>
<select name="source" class="text-xs py-1 px-2">
<option value="">Toutes sources</option>
<option value="import" {% if source == 'import' %}selected{% endif %}>Import xlsx</option>
<option value="standard" {% if source == 'standard' %}selected{% endif %}>Campagne std</option>
<option value="quickwin" {% if source == 'quickwin' %}selected{% endif %}>QuickWin</option>
</select>
<select name="os_family" class="text-xs py-1 px-2">
<option value="">Tous OS</option>
{% for o in filter_opts.os %}<option value="{{ o }}" {% if os_family == o %}selected{% endif %}>{{ o }}</option>{% endfor %}
</select>
<select name="zone" class="text-xs py-1 px-2">
<option value="">Toutes zones</option>
{% for z in filter_opts.zones %}<option value="{{ z }}" {% if zone == z %}selected{% endif %}>{{ z }}</option>{% endfor %}
</select>
<select name="domain" class="text-xs py-1 px-2">
<option value="">Tous domaines</option>
{% for d in filter_opts.domains %}<option value="{{ d }}" {% if domain == d %}selected{% endif %}>{{ d }}</option>{% endfor %}
</select>
<select name="intervenant" class="text-xs py-1 px-2">
<option value="">Tous intervenants</option>
{% for i in filter_opts.intervenants %}<option value="{{ i }}" {% if intervenant == i %}selected{% endif %}>{{ i }}</option>{% endfor %}
</select>
<input type="text" name="hostname" value="{{ hostname or '' }}" placeholder="Hostname..." class="text-xs py-1 px-2" style="width:140px">
<button type="submit" class="btn-primary px-3 py-1 text-xs">Filtrer</button>
<a href="/patching/historique?year={{ year }}" class="text-xs text-gray-500 hover:text-cyber-accent">Reset</a>
<span class="text-xs text-gray-500 ml-auto">{{ total_filtered }} résultat{{ 's' if total_filtered != 1 }}</span>
</form>
</div>
<!-- Tableau -->
<div class="card overflow-x-auto">
<table class="w-full table-cyber text-xs">
<thead><tr>
<th class="p-2 text-left">Hostname</th>
<th class="p-2 text-center">OS</th>
<th class="p-2 text-center">Zone</th>
<th class="p-2 text-center">Domaine</th>
<th class="p-2 text-center">État</th>
<th class="p-2 text-center">Date</th>
<th class="p-2 text-center">Sem.</th>
<th class="p-2 text-center">Intervenant</th>
<th class="p-2 text-center">Source</th>
<th class="p-2 text-center">Status</th>
<th class="p-2 text-left">Notes</th>
</tr></thead>
<tbody>
{% for r in rows %}
<tr class="border-t border-cyber-border/30 hover:bg-cyber-hover/20">
<td class="p-2 font-mono text-cyber-accent"><a href="/servers/{{ r.sid }}" class="hover:underline">{{ r.hostname }}</a></td>
<td class="p-2 text-center text-gray-400">{{ (r.os_family or '-')[:6] }}</td>
<td class="p-2 text-center"><span class="badge {% if r.zone == 'DMZ' %}badge-red{% else %}badge-gray{% endif %}">{{ r.zone or '-' }}</span></td>
<td class="p-2 text-center text-gray-300">{{ (r.domain_name or '-')[:10] }}</td>
<td class="p-2 text-center"><span class="badge {% if r.etat == 'Production' %}badge-green{% else %}badge-yellow{% endif %}">{{ (r.etat or '-')[:6] }}</span></td>
<td class="p-2 text-center text-gray-300">{{ r.date_patch.strftime('%Y-%m-%d %H:%M') if r.date_patch else '-' }}</td>
<td class="p-2 text-center text-gray-400">{% if r.date_patch %}S{{ r.date_patch.strftime('%V') }}{% else %}-{% endif %}</td>
<td class="p-2 text-center text-gray-300">{{ r.intervenant_name or '-' }}</td>
<td class="p-2 text-center">
{% if r.source_type == 'import' %}<span class="badge" style="background:#1e3a5f;color:#60a5fa;">xlsx</span>
{% elif r.source_type == 'standard' %}<a href="/campaigns/{{ r.campaign_id }}" class="badge" style="background:#164e63;color:#22d3ee;text-decoration:none">{{ r.campaign_label or 'Campagne' }}</a>
{% elif r.source_type == 'quickwin' %}<a href="/quickwin/{{ r.run_id }}" class="badge" style="background:#3b1f5e;color:#c084fc;text-decoration:none">{{ r.run_label or 'QuickWin' }}</a>
{% else %}<span class="badge badge-gray">{{ r.source_type or '?' }}</span>{% endif %}
</td>
<td class="p-2 text-center"><span class="badge {% if r.status == 'ok' or r.status == 'patched' %}badge-green{% elif r.status == 'ko' or r.status == 'failed' %}badge-red{% else %}badge-yellow{% endif %}">{{ r.status }}</span></td>
<td class="p-2 text-gray-400" style="max-width:180px;overflow:hidden;text-overflow:ellipsis;white-space:nowrap" title="{{ r.notes or '' }}">{{ (r.notes or '-')[:40] }}</td>
</tr>
{% endfor %}
{% if not rows %}
<tr><td colspan="11" class="p-6 text-center text-gray-500">Aucun event de patching pour ce filtre</td></tr>
{% endif %}
</tbody>
</table>
</div>
<!-- Pagination -->
{% if total_filtered > per_page %}
<div class="flex justify-center gap-2 mt-4">
{% set qs = 'year=' ~ year ~ ('&week=' ~ week if week else '') ~ ('&source=' ~ source if source else '') ~ ('&os_family=' ~ os_family if os_family else '') ~ ('&zone=' ~ zone if zone else '') ~ ('&domain=' ~ domain if domain else '') ~ ('&intervenant=' ~ intervenant if intervenant else '') ~ ('&hostname=' ~ hostname if hostname else '') %}
{% if page > 1 %}<a href="?{{ qs }}&page={{ page - 1 }}" class="btn-sm bg-cyber-border text-gray-300 px-3 py-1 text-xs">← Précédent</a>{% endif %}
<span class="text-xs text-gray-500 py-1">Page {{ page }} / {{ ((total_filtered - 1) // per_page) + 1 }}</span>
{% if page * per_page < total_filtered %}<a href="?{{ qs }}&page={{ page + 1 }}" class="btn-sm bg-cyber-border text-gray-300 px-3 py-1 text-xs">Suivant →</a>{% endif %}
</div>
{% endif %}
{% endblock %}

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,10 @@
-- Migration 2026-04-17 : ajout colonne intervenant_name a patch_history
-- pour stocker le nom d'intervenant libre provenant du xlsx (ex "Khalid", "Thierno")
-- sans FK users (car ne correspond pas forcement a un user patchcenter)
BEGIN;
ALTER TABLE patch_history ADD COLUMN IF NOT EXISTS intervenant_name varchar(100);
CREATE INDEX IF NOT EXISTS idx_ph_intervenant_name ON patch_history (intervenant_name);
COMMIT;

View File

@ -0,0 +1,44 @@
-- Migration 2026-04-17 : lier users ↔ contacts ↔ LDAP proprement (FK + index)
--
-- Avant : users.itop_person_id (int) pointe vers iTop (pas vers contacts.id)
-- -> lien indirect fragile entre users et contacts via itop_id
--
-- Apres : users.contact_id (FK propre vers contacts.id)
-- contacts.ldap_dn (trace la source AD quand le contact vient d'un import LDAP)
-- Les 3 tables sont jointes directement : users.contact_id = contacts.id
-- La source LDAP est identifiee par contacts.ldap_dn IS NOT NULL et/ou
-- users.auth_type = 'ldap'.
BEGIN;
-- 1. users.contact_id : FK vers contacts.id
ALTER TABLE users ADD COLUMN IF NOT EXISTS contact_id INTEGER;
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_constraint WHERE conname = 'users_contact_id_fkey'
) THEN
ALTER TABLE users ADD CONSTRAINT users_contact_id_fkey
FOREIGN KEY (contact_id) REFERENCES contacts(id) ON DELETE SET NULL;
END IF;
END$$;
CREATE INDEX IF NOT EXISTS idx_users_contact_id ON users (contact_id);
-- 2. contacts.ldap_dn : trace provenance AD
ALTER TABLE contacts ADD COLUMN IF NOT EXISTS ldap_dn varchar(500);
CREATE INDEX IF NOT EXISTS idx_contacts_ldap_dn ON contacts (ldap_dn)
WHERE ldap_dn IS NOT NULL;
-- 3. Backfill users.contact_id depuis users.email <-> contacts.email
-- (pour les users deja presents dont l'email matche un contact)
UPDATE users u
SET contact_id = c.id
FROM contacts c
WHERE u.contact_id IS NULL
AND u.email IS NOT NULL
AND lower(u.email) = lower(c.email);
COMMENT ON COLUMN users.contact_id IS 'FK vers contacts.id — lien direct user ↔ contact (le meme email)';
COMMENT ON COLUMN contacts.ldap_dn IS 'DN AD d''ou provient ce contact (import LDAP). NULL si import iTop ou saisie manuelle';
COMMIT;

View File

@ -0,0 +1,32 @@
-- Migration 2026-04-18 : table tour de garde SecOps
BEGIN;
CREATE TABLE IF NOT EXISTS secops_duty (
id serial PRIMARY KEY,
year smallint NOT NULL,
week_number smallint NOT NULL,
week_code varchar(5) NOT NULL,
week_start date,
week_end date,
absences text,
tdg_s1 varchar(50),
tdg_symantec varchar(50),
tdg_m365 varchar(50),
tdg_commvault varchar(50),
tdg_meteo varchar(50),
tdg_dmz varchar(50),
tdg_safenet varchar(50),
tdg_quarantaine varchar(50),
tdg_securisation varchar(50),
tdg_incident_majeur varchar(50),
tdg_incident_critique varchar(50),
emails_dest varchar(100),
created_at timestamptz DEFAULT now()
);
CREATE UNIQUE INDEX IF NOT EXISTS secops_duty_year_week_uniq ON secops_duty (year, week_number);
CREATE INDEX IF NOT EXISTS secops_duty_week_idx ON secops_duty (year, week_number);
COMMENT ON TABLE secops_duty IS 'Tour de garde SecOps hebdomadaire (source: Tour de garde secops_2026.xlsx)';
COMMIT;

View File

@ -0,0 +1,223 @@
"""Import des membres d'un groupe AD vers la table users + lien avec contacts.
3 champs lies :
1. LDAP/AD (source : groupe AD specifique, ex. CN=secops,...)
2. contacts (par email : match existant, creation si absent)
3. users (par username=sAMAccountName, auth_type='ldap')
+ users.itop_person_id = contacts.itop_id (si contact matche)
Par defaut le groupe AD cible est :
CN=secops,OU=Groupes d administration,OU=Administration,DC=sanef,DC=groupe
La config LDAP (serveur, bind DN, bind pass, base DN) est lue depuis app_secrets
via ldap_service.
Usage (doit tourner depuis le poste SANEF car l'AD n'est pas joignable du lab) :
python tools/import_ldap_group_users.py
python tools/import_ldap_group_users.py --group "CN=secops,OU=...,DC=sanef,DC=groupe" --dry-run
"""
import os
import sys
import argparse
from pathlib import Path
from sqlalchemy import create_engine, text
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
or os.getenv("DATABASE_URL")
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
DEFAULT_GROUP_DN = "CN=secops,OU=Groupes d administration,OU=Administration,DC=sanef,DC=groupe"
def get_ldap_config(engine):
"""Recupere la config LDAP depuis app_secrets (reutilise ldap_service)."""
from app.services.ldap_service import _get_config
with engine.connect() as conn:
return _get_config(conn)
def fetch_group_members(cfg, group_dn):
"""Retourne liste de dicts {username, name, email, dn}.
Strategie : bind service account -> search pour user dont memberOf contient group_dn.
Plus fiable que de lire group.member (limite 1500 DN par defaut).
"""
from ldap3 import Server, Connection, ALL, SUBTREE
use_ssl = cfg["server"].startswith("ldaps://")
server = Server(cfg["server"], get_info=ALL, use_ssl=use_ssl)
conn = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"],
auto_bind=True)
# Filter LDAP : membre direct du groupe (inclut comptes admin, meme sans mail)
search_filter = (
f"(&(objectClass=user)(objectCategory=person)"
f"(memberOf={group_dn}))"
)
conn.search(cfg["base_dn"], search_filter, search_scope=SUBTREE,
attributes=["sAMAccountName", "displayName", "mail",
"userPrincipalName", "distinguishedName",
"userAccountControl"])
members = []
for entry in conn.entries:
sam = str(entry.sAMAccountName) if entry.sAMAccountName else None
if not sam:
print(f" [SKIP] Entry sans sAMAccountName : {entry.entry_dn}")
continue
# Priorite email : mail > userPrincipalName > fallback sam@sanef.com
email = None
if entry.mail and str(entry.mail).strip():
email = str(entry.mail).strip().lower()
elif entry.userPrincipalName and str(entry.userPrincipalName).strip():
email = str(entry.userPrincipalName).strip().lower()
else:
email = f"{sam.lower()}@sanef.com"
print(f" [INFO] {sam} sans mail AD, fallback : {email}")
# Verifier si compte desactive (pour info seulement)
uac = entry.userAccountControl.value if entry.userAccountControl else 0
if isinstance(uac, int) and uac & 0x2:
print(f" [WARN] {sam} compte AD DESACTIVE (UAC={uac}) — importe quand meme")
members.append({
"username": sam.lower(),
"display_name": str(entry.displayName) if entry.displayName else sam,
"email": email,
"dn": str(entry.entry_dn),
})
conn.unbind()
return members
SQL_FIND_CONTACT = text("""
SELECT id, itop_id FROM contacts WHERE lower(email) = :email LIMIT 1
""")
SQL_INSERT_CONTACT = text("""
INSERT INTO contacts (name, email, role, team, ldap_dn,
is_active, is_verified, created_at, updated_at)
VALUES (:name, :email, 'contact_technique', 'SecOps', :ldap_dn,
true, true, now(), now())
ON CONFLICT (email) DO UPDATE SET
name = EXCLUDED.name,
ldap_dn = EXCLUDED.ldap_dn,
updated_at = now()
RETURNING id, itop_id
""")
SQL_FIND_USER = text("""
SELECT id FROM users WHERE username = :username LIMIT 1
""")
SQL_INSERT_USER = text("""
INSERT INTO users (username, display_name, email, role, auth_type,
is_active, contact_id, itop_person_id,
created_at, updated_at)
VALUES (:username, :display_name, :email, 'operator', 'ldap',
true, :contact_id, :itop_pid,
now(), now())
""")
SQL_UPDATE_USER = text("""
UPDATE users SET
display_name = :display_name,
email = :email,
auth_type = 'ldap',
-- is_active PRESERVE : jamais reactive un user desactive manuellement
contact_id = COALESCE(:contact_id, contact_id),
itop_person_id = COALESCE(:itop_pid, itop_person_id),
updated_at = now()
WHERE id = :uid
""")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--group", default=DEFAULT_GROUP_DN,
help=f"DN du groupe AD (defaut: {DEFAULT_GROUP_DN})")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
print(f"[INFO] Groupe cible : {args.group}")
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
cfg = get_ldap_config(engine)
if not cfg["enabled"]:
print("[ERR] LDAP desactive dans app_secrets (ldap_enabled != true).")
sys.exit(1)
if not cfg["server"] or not cfg["base_dn"]:
print("[ERR] LDAP non configure (server ou base_dn manquant).")
sys.exit(1)
print(f"[INFO] LDAP server : {cfg['server']} base_dn : {cfg['base_dn']}")
try:
members = fetch_group_members(cfg, args.group)
except Exception as e:
print(f"[ERR] LDAP search failed : {e}")
sys.exit(2)
print(f"[INFO] Membres AD retrouves : {len(members)}")
for m in members[:5]:
print(f" {m['username']:20s} {m['email']:40s} {m['display_name']}")
if len(members) > 5:
print(f" ... ({len(members) - 5} autres)")
if args.dry_run:
print("[DRY-RUN] Aucun write")
return
inserted_u = updated_u = created_c = linked_itop = linked_contact = 0
with engine.begin() as conn:
for m in members:
# 1. Contact : find or create (stocke ldap_dn pour tracer source AD)
row = conn.execute(SQL_FIND_CONTACT, {"email": m["email"]}).fetchone()
if row:
contact_id, itop_id = row
# Mettre a jour ldap_dn si contact deja existant sans la trace
conn.execute(text(
"UPDATE contacts SET ldap_dn = COALESCE(ldap_dn, :dn) WHERE id = :cid"
), {"dn": m["dn"], "cid": contact_id})
else:
r = conn.execute(SQL_INSERT_CONTACT, {
"name": m["display_name"],
"email": m["email"],
"ldap_dn": m["dn"],
}).fetchone()
contact_id, itop_id = r
created_c += 1
# 2. User : upsert + lien contact_id (FK) + itop_person_id (historique)
u = conn.execute(SQL_FIND_USER, {"username": m["username"]}).fetchone()
params = {
"username": m["username"],
"display_name": m["display_name"],
"email": m["email"],
"contact_id": contact_id,
"itop_pid": itop_id, # None si contact sans itop_id
}
if u:
conn.execute(SQL_UPDATE_USER, {**params, "uid": u[0]})
updated_u += 1
else:
conn.execute(SQL_INSERT_USER, params)
inserted_u += 1
linked_contact += 1
if itop_id:
linked_itop += 1
print(f"[OK] Termine :")
print(f" users : INSERT {inserted_u} UPDATE {updated_u}")
print(f" contacts : CREATE {created_c}")
print(f" links : users.contact_id = {linked_contact}")
print(f" users.itop_person_id = {linked_itop}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,275 @@
"""Import historique patching depuis Plan de Patching serveurs 2026.xlsx (SOURCE DE VERITE).
Perimetre : 2025 + 2026 uniquement.
- Histo-2025 (cols L/M = 1er sem, O/P = 2eme sem)
- S02..S52 (weekly 2026 : nom de cellule VERT = patche)
Regles :
- Weekly sheets : cellule du nom (col A) AVEC FOND VERT = serveur patche
- Date : col N (14) ; Heure : col O (15)
- Si date manque -> lundi de la semaine (ISO) ; si heure manque -> 00:00
- La semaine est toujours derivee du nom de sheet (S02..S52) ou de date_patch
Usage :
python tools/import_plan_patching_xlsx.py [xlsx] [--truncate] [--dry-run]
"""
import os
import re
import sys
import glob
import argparse
from datetime import datetime, time, date, timedelta
from pathlib import Path
import openpyxl
from sqlalchemy import create_engine, text
ROOT = Path(__file__).resolve().parent.parent
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
or os.getenv("DATABASE_URL")
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
# Normalisation des noms d'intervenants (xlsx libre -> canonique)
INTERVENANT_MAP = {
"sophie/joel": "Joel",
"joel/sophie": "Joel",
}
def normalize_intervenant(name):
if not name:
return None
s = str(name).strip()
return INTERVENANT_MAP.get(s.lower(), s)
def is_green(cell):
"""True si la cellule a un fond vert (dominante G > R et G > B)."""
if cell.fill is None or cell.fill.fgColor is None:
return False
fc = cell.fill.fgColor
rgb = None
if fc.type == "rgb" and fc.rgb:
rgb = fc.rgb.upper()
elif fc.type == "theme":
# Themes Office 9/6 = green-ish accents
return fc.theme in (9, 6)
if not rgb or len(rgb) < 6:
return False
try:
rr = int(rgb[-6:-4], 16)
gg = int(rgb[-4:-2], 16)
bb = int(rgb[-2:], 16)
except ValueError:
return False
return gg > 120 and gg > rr + 30 and gg > bb + 30
def parse_week_num(sheet_name):
m = re.match(r"^[Ss](\d{1,2})$", sheet_name.strip())
return int(m.group(1)) if m else None
def monday_of_iso_week(year, week):
jan4 = date(year, 1, 4)
start = jan4 - timedelta(days=jan4.isoweekday() - 1) + timedelta(weeks=week - 1)
return start
def parse_hour(val):
if val is None:
return None
if isinstance(val, time):
return val
if isinstance(val, datetime):
return val.time()
s = str(val).strip().lower().replace("h", ":")
m = re.match(r"(\d{1,2})(?::(\d{2}))?", s)
if not m:
return None
hh = int(m.group(1))
mm = int(m.group(2) or 0)
if 0 <= hh < 24 and 0 <= mm < 60:
return time(hh, mm)
return None
def parse_date_cell(val):
if val is None:
return None
if isinstance(val, datetime):
return val
if isinstance(val, date):
return datetime.combine(val, time(0, 0))
s = str(val).strip()
m = re.match(r"(\d{2})/(\d{2})/(\d{4})", s)
if m:
try:
return datetime(int(m.group(3)), int(m.group(2)), int(m.group(1)))
except Exception:
return None
return None
def find_xlsx():
for p in [
ROOT / "deploy" / "Plan de Patching serveurs 2026.xlsx",
ROOT / "deploy" / "Plan_de_Patching_serveurs_2026.xlsx",
]:
if p.exists():
return str(p)
hits = glob.glob(str(ROOT / "deploy" / "Plan*Patching*erveurs*2026*.xlsx"))
return hits[0] if hits else None
def collect_events(wb, hosts):
"""Retourne liste dicts patch_history : {sid, dt, status, notes}.
3 champs toujours renseignes : semaine (dans notes), date (date_patch::date),
heure (date_patch::time 00:00 si inconnue).
"""
events = []
stats = {"histo_2025_s1": 0, "histo_2025_s2": 0,
"weekly": 0, "no_server": 0, "weekly_no_color": 0}
# --- Histo-2025 : col B (2) Intervenant, col L (12) date S1, col M (13) flag S1, col O (15) date S2, col P (16) flag S2
if "Histo-2025" in wb.sheetnames:
ws = wb["Histo-2025"]
for row_idx in range(2, ws.max_row + 1):
hn = ws.cell(row=row_idx, column=1).value
if not hn:
continue
sid = hosts.get(str(hn).strip().lower())
if not sid:
stats["no_server"] += 1
continue
interv = ws.cell(row=row_idx, column=2).value
interv = str(interv).strip() if interv else None
date_s1 = parse_date_cell(ws.cell(row=row_idx, column=12).value)
flag_s1 = ws.cell(row=row_idx, column=13).value
if flag_s1 and isinstance(flag_s1, int) and flag_s1 >= 1:
dt = date_s1 or datetime(2025, 6, 30, 0, 0)
events.append({"sid": sid, "dt": dt, "status": "ok",
"notes": f"Histo-2025 S1 (x{flag_s1})",
"interv": interv})
stats["histo_2025_s1"] += 1
date_s2 = parse_date_cell(ws.cell(row=row_idx, column=15).value)
flag_s2 = ws.cell(row=row_idx, column=16).value
if flag_s2 and isinstance(flag_s2, int) and flag_s2 >= 1:
dt = date_s2 or datetime(2025, 12, 31, 0, 0)
events.append({"sid": sid, "dt": dt, "status": "ok",
"notes": f"Histo-2025 S2 (x{flag_s2})",
"interv": interv})
stats["histo_2025_s2"] += 1
# --- Weekly sheets S02..S52 : nom colore VERT = patche (2026)
for sname in wb.sheetnames:
wk = parse_week_num(sname)
if wk is None or not (1 <= wk <= 53):
continue
ws = wb[sname]
fallback_monday = monday_of_iso_week(2026, wk)
for row_idx in range(2, ws.max_row + 1):
hn_cell = ws.cell(row=row_idx, column=1)
hn = hn_cell.value
if not hn or not any(c.isalpha() for c in str(hn)):
continue
if not is_green(hn_cell):
stats["weekly_no_color"] += 1
continue
hn_norm = str(hn).strip().split(".")[0].lower()
sid = hosts.get(hn_norm)
if not sid:
stats["no_server"] += 1
continue
interv = ws.cell(row=row_idx, column=2).value
interv = str(interv).strip() if interv else None
# col N (14) = Date, col O (15) = Heure
date_val = ws.cell(row=row_idx, column=14).value
hour_val = ws.cell(row=row_idx, column=15).value
dt_base = parse_date_cell(date_val) or datetime.combine(fallback_monday, time(0, 0))
hr = parse_hour(hour_val)
if hr:
dt_base = datetime.combine(dt_base.date(), hr)
# sinon : heure = 00:00 par defaut (deja dans dt_base)
# Skip si date de patch dans le futur (cellule coloree en avance)
if dt_base > datetime.now():
stats["weekly_future"] = stats.get("weekly_future", 0) + 1
continue
events.append({"sid": sid, "dt": dt_base, "status": "ok",
"notes": f"Semaine {wk:02d} 2026",
"interv": interv})
stats["weekly"] += 1
return events, stats
def main():
parser = argparse.ArgumentParser()
parser.add_argument("xlsx", nargs="?", default=None)
parser.add_argument("--truncate", action="store_true",
help="TRUNCATE patch_history avant import (source de verite)")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
xlsx = args.xlsx or find_xlsx()
if not xlsx or not os.path.exists(xlsx):
print("[ERR] Fichier Plan de Patching introuvable. Place-le dans deploy/.")
sys.exit(1)
print(f"[INFO] Fichier: {xlsx}")
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
wb = openpyxl.load_workbook(xlsx, data_only=True)
print(f"[INFO] Sheets: {', '.join(wb.sheetnames)}")
with engine.begin() as conn:
hosts = {}
for r in conn.execute(text("SELECT id, hostname FROM servers")).fetchall():
hosts[r.hostname.lower()] = r.id
print(f"[INFO] Servers en DB: {len(hosts)}")
events, stats = collect_events(wb, hosts)
print("[INFO] Events detectes:")
for k, v in stats.items():
print(f" {v:5d} {k}")
print(f"[INFO] TOTAL events: {len(events)}")
if args.dry_run:
print("[DRY-RUN] Aucun write")
return
if args.truncate:
print("[INFO] TRUNCATE patch_history RESTART IDENTITY CASCADE")
conn.execute(text("TRUNCATE TABLE patch_history RESTART IDENTITY CASCADE"))
inserted = skipped = 0
for ev in events:
existing = conn.execute(text(
"SELECT id FROM patch_history WHERE server_id=:sid AND date_patch=:dt"
), {"sid": ev["sid"], "dt": ev["dt"]}).fetchone()
if existing:
skipped += 1
continue
conn.execute(text("""
INSERT INTO patch_history (server_id, date_patch, status, notes, intervenant_name)
VALUES (:sid, :dt, :status, :notes, :interv)
"""), ev)
inserted += 1
print(f"[OK] INSERT: {inserted} | SKIP (doublon): {skipped}")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,213 @@
"""Import planning annuel patching depuis Planning Patching 2026_ayoub.xlsx feuille Planning.
Mapping colonnes feuille Planning :
A : domaine+env (ex Infrastructure HPROD, Peage PROD, FL Prod)
B : Patch N marker (cycle) OU semaine NN (ligne data)
C : plage dates DD/MM/YYYY ... DD/MM/YYYY OU Gel
D : ferie (datetime) OU Gel OU texte
Structure cible table patch_planning :
year, week_number, week_code, week_start, week_end, cycle,
domain_code (FK domains), env_scope, status, note
Usage :
python tools/import_planning_xlsx.py [chemin_fichier.xlsx]
"""
import os
import sys
import re
import glob
from pathlib import Path
from datetime import date, datetime, timedelta
import openpyxl
from sqlalchemy import create_engine, text
ROOT = Path(__file__).resolve().parent.parent
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
or os.getenv("DATABASE_URL")
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
def parse_label(a):
"""Retourne liste (domain_code, env_scope) pour le libelle col A.
Un libelle peut mapper sur plusieurs domaines (ex BI + Gestion) ou
un scope combine (prod_pilot pour Peage HPROD / PROD Pilote).
"""
if not a:
return []
lo = a.lower()
if "bi" in lo and "gestion" in lo:
return [("BI", "all"), ("GESTION", "all")]
if "peage" in lo or "p\xe9age" in lo:
if "pilot" in lo:
return [("PEA", "prod_pilot")]
if "hprod" in lo:
return [("PEA", "hprod")]
if "prod" in lo:
return [("PEA", "prod")]
if "infrastructure" in lo:
if "hprod" in lo:
return [("INFRASTRUC", "hprod")]
return [("INFRASTRUC", "prod")]
if "trafic" in lo:
if "hprod" in lo:
return [("trafic", "hprod")]
return [("trafic", "prod")]
if lo.startswith("fl"):
if "pre-prod" in lo or "pr\xe9-prod" in lo or "preprod" in lo or "pr\xe9prod" in lo:
return [("FL", "pilot")]
if "prod" in lo and "pre" not in lo and "pr\xe9" not in lo:
return [("FL", "prod")]
return [("FL", "hprod")]
return []
def parse_dates(c_val, year):
"""Parse col C. Retourne (week_start, week_end, is_freeze)."""
if not c_val:
return None, None, False
s = str(c_val).strip()
if s.lower() == "gel":
return None, None, True
m = re.search(r"(\d{2})/(\d{2})/(\d{4}).*?(\d{2})/(\d{2})/(\d{4})", s)
if m:
d1 = date(int(m.group(3)), int(m.group(2)), int(m.group(1)))
d2 = date(int(m.group(6)), int(m.group(5)), int(m.group(4)))
return d1, d2, False
return None, None, False
def iso_week_dates(year, week):
"""Fallback : dates debut/fin semaine ISO depuis year+week."""
jan4 = date(year, 1, 4)
start = jan4 - timedelta(days=jan4.isoweekday() - 1) + timedelta(weeks=week - 1)
return start, start + timedelta(days=6)
def parse_note(d_val):
if d_val is None:
return None
if isinstance(d_val, (datetime, date)):
dd = d_val.date() if isinstance(d_val, datetime) else d_val
return f"Ferie : {dd.strftime('%d/%m/%Y')}"
s = str(d_val).strip()
if not s or s.lower() == "gel":
return None
return s
def parse_planning(xlsx_path, year_default=2026):
wb = openpyxl.load_workbook(xlsx_path, data_only=True)
if "Planning" not in wb.sheetnames:
raise SystemExit(f"[ERR] Sheet Planning introuvable. Sheets: {wb.sheetnames}")
ws = wb["Planning"]
rows = []
current_cycle = None
for row in ws.iter_rows(values_only=True):
a = row[0] if len(row) > 0 else None
b = row[1] if len(row) > 1 else None
c = row[2] if len(row) > 2 else None
d = row[3] if len(row) > 3 else None
if b and re.match(r"^\s*Patch\s+\d+\s*$", str(b), re.I):
m = re.search(r"\d+", str(b))
current_cycle = int(m.group(0)) if m else None
continue
if not b:
continue
m = re.match(r"^\s*semaine\s+(\d+)\s*$", str(b), re.I)
if not m:
continue
week_number = int(m.group(1))
year = year_default
week_code = f"S{week_number:02d}"
d1, d2, is_freeze = parse_dates(c, year)
if not d1:
d1, d2 = iso_week_dates(year, week_number)
note = parse_note(d)
if is_freeze and note is None:
note = "Gel"
if is_freeze:
status = "freeze"
else:
status = "open" if a else "empty"
targets = parse_label(a)
if not targets:
targets = [(None, "all")]
for dom, env in targets:
rows.append({
"year": year,
"week_number": week_number,
"week_code": week_code,
"week_start": d1,
"week_end": d2,
"cycle": current_cycle,
"domain_code": dom,
"env_scope": env,
"status": status,
"note": note,
})
return rows
SQL_INSERT = text("""
INSERT INTO patch_planning
(year, week_number, week_code, week_start, week_end, cycle,
domain_code, env_scope, status, note)
VALUES
(:year, :week_number, :week_code, :week_start, :week_end, :cycle,
:domain_code, :env_scope, :status, :note)
""")
def main():
if len(sys.argv) > 1:
xlsx = sys.argv[1]
else:
xlsx = None
for p in [
ROOT / "deploy" / "Planning Patching 2026_ayoub.xlsx",
ROOT / "deploy" / "Planning_Patching_2026_ayoub.xlsx",
]:
if p.exists():
xlsx = str(p)
break
if not xlsx:
candidates = glob.glob(str(ROOT / "deploy" / "*Planning*ayoub*.xlsx"))
xlsx = candidates[0] if candidates else None
if not xlsx or not os.path.exists(xlsx):
print("[ERR] Fichier Planning introuvable. Place-le dans deploy/ (ex: deploy/Planning Patching 2026_ayoub.xlsx)")
sys.exit(1)
print(f"[INFO] Fichier: {xlsx}")
rows = parse_planning(xlsx)
print(f"[INFO] Lignes parses: {len(rows)}")
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
inserted = 0
with engine.begin() as conn:
for r in rows:
conn.execute(SQL_INSERT, r)
inserted += 1
print(f"[OK] Termine - INSERT: {inserted}")
print("[INFO] Verifs :")
print(" SELECT week_code, domain_code, env_scope, status FROM patch_planning ORDER BY year, week_number, domain_code;")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,159 @@
"""Import tour de garde SecOps depuis Tour de garde secops_2026.xlsx.
Lit la feuille 'Tour de garde', UPSERT dans secops_duty.
Usage:
python tools/import_tour_de_garde_xlsx.py [xlsx] [--truncate]
"""
import os
import sys
import re
import glob
from pathlib import Path
from datetime import date, datetime, timedelta
import openpyxl
from sqlalchemy import create_engine, text
ROOT = Path(__file__).resolve().parent.parent
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
or os.getenv("DATABASE_URL")
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
def parse_dates(c_val):
if not c_val:
return None, None
s = str(c_val).strip()
m = re.search(r"(\d{2})/(\d{2})/(\d{4}).*?(\d{2})/(\d{2})/(\d{4})", s)
if m:
d1 = date(int(m.group(3)), int(m.group(2)), int(m.group(1)))
d2 = date(int(m.group(6)), int(m.group(5)), int(m.group(4)))
return d1, d2
return None, None
def s(val):
if val is None:
return None
t = str(val).strip()
return t or None
def find_xlsx():
for p in [
ROOT / "deploy" / "Tour de garde secops_2026.xlsx",
ROOT / "deploy" / "Tour_de_garde_secops_2026.xlsx",
]:
if p.exists():
return str(p)
hits = glob.glob(str(ROOT / "deploy" / "*our*garde*.xlsx"))
return hits[0] if hits else None
def parse_tour_de_garde(xlsx_path):
wb = openpyxl.load_workbook(xlsx_path, data_only=True)
ws_name = next((n for n in wb.sheetnames if "garde" in n.lower()), None)
if not ws_name:
raise SystemExit(f"[ERR] Sheet 'Tour de garde' introuvable. Sheets: {wb.sheetnames}")
ws = wb[ws_name]
rows = []
for i, row in enumerate(ws.iter_rows(values_only=True)):
if i == 0:
continue
week_code = row[0]
if not week_code or not str(week_code).strip().startswith("S"):
continue
wc = str(week_code).strip()
m = re.match(r"S(\d+)", wc)
if not m:
continue
week_num = int(m.group(1))
d1, d2 = parse_dates(row[2])
year = d1.year if d1 and d1.month > 6 else (d2.year if d2 else 2026)
if d1 and d1.month == 12 and week_num <= 1:
year = d2.year if d2 else d1.year + 1
rows.append({
"year": year,
"week_number": week_num,
"week_code": wc,
"week_start": d1,
"week_end": d2,
"absences": s(row[1]),
"tdg_s1": s(row[3]),
"tdg_symantec": s(row[4]),
"tdg_m365": s(row[5]),
"emails_dest": s(row[6]),
"tdg_commvault": s(row[7]),
"tdg_meteo": s(row[8]),
"tdg_dmz": s(row[11]) if len(row) > 11 else None,
"tdg_safenet": s(row[12]) if len(row) > 12 else None,
"tdg_quarantaine": s(row[13]) if len(row) > 13 else None,
"tdg_securisation": s(row[14]) if len(row) > 14 else None,
"tdg_incident_majeur": s(row[16]) if len(row) > 16 else None,
"tdg_incident_critique": s(row[17]) if len(row) > 17 else None,
})
return rows
SQL_UPSERT = text("""
INSERT INTO secops_duty
(year, week_number, week_code, week_start, week_end, absences,
tdg_s1, tdg_symantec, tdg_m365, tdg_commvault, tdg_meteo,
tdg_dmz, tdg_safenet, tdg_quarantaine, tdg_securisation,
tdg_incident_majeur, tdg_incident_critique, emails_dest)
VALUES
(:year, :week_number, :week_code, :week_start, :week_end, :absences,
:tdg_s1, :tdg_symantec, :tdg_m365, :tdg_commvault, :tdg_meteo,
:tdg_dmz, :tdg_safenet, :tdg_quarantaine, :tdg_securisation,
:tdg_incident_majeur, :tdg_incident_critique, :emails_dest)
ON CONFLICT (year, week_number) DO UPDATE SET
week_code = EXCLUDED.week_code,
week_start = EXCLUDED.week_start,
week_end = EXCLUDED.week_end,
absences = EXCLUDED.absences,
tdg_s1 = EXCLUDED.tdg_s1,
tdg_symantec = EXCLUDED.tdg_symantec,
tdg_m365 = EXCLUDED.tdg_m365,
tdg_commvault = EXCLUDED.tdg_commvault,
tdg_meteo = EXCLUDED.tdg_meteo,
tdg_dmz = EXCLUDED.tdg_dmz,
tdg_safenet = EXCLUDED.tdg_safenet,
tdg_quarantaine = EXCLUDED.tdg_quarantaine,
tdg_securisation = EXCLUDED.tdg_securisation,
tdg_incident_majeur = EXCLUDED.tdg_incident_majeur,
tdg_incident_critique = EXCLUDED.tdg_incident_critique,
emails_dest = EXCLUDED.emails_dest
""")
def main():
xlsx = sys.argv[1] if len(sys.argv) > 1 else find_xlsx()
if not xlsx or not os.path.exists(xlsx):
print("[ERR] Fichier Tour de garde introuvable. Place-le dans deploy/")
sys.exit(1)
print(f"[INFO] Fichier: {xlsx}")
rows = parse_tour_de_garde(xlsx)
print(f"[INFO] Semaines parsees: {len(rows)}")
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
truncate = "--truncate" in sys.argv
with engine.begin() as conn:
if truncate:
conn.execute(text("TRUNCATE TABLE secops_duty RESTART IDENTITY"))
print("[INFO] TRUNCATE secops_duty")
for r in rows:
conn.execute(SQL_UPSERT, r)
print(f"[OK] UPSERT: {len(rows)} semaines")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,141 @@
"""Verifie + etablit les 3 liens : patch_history <-> users <-> contacts.
Contexte :
- patch_history.intervenant_name : texte libre venant du xlsx (ex "Khalid", "Mouaad")
- users.id : FK cible pour patch_history.intervenant_id
- users.contact_id : FK vers contacts.id
- contacts.ldap_dn : trace source AD
Matching : on tente d'apparier patch_history.intervenant_name a users.display_name
(ex "Khalid" -> "MOUTAOUAKIL-ext Khalid (admin)") en cherchant le prenom comme token.
Usage :
python tools/link_patch_history_intervenants.py # verif seule
python tools/link_patch_history_intervenants.py --apply # UPDATE FK
"""
import os
import sys
import argparse
from sqlalchemy import create_engine, text
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
or os.getenv("DATABASE_URL")
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
def report_state(conn):
print("\n=== ETAT ACTUEL DES 3 TABLES ===")
r = conn.execute(text("""
SELECT
(SELECT COUNT(*) FROM users) AS users_total,
(SELECT COUNT(*) FROM users WHERE auth_type='ldap') AS users_ldap,
(SELECT COUNT(*) FROM users WHERE contact_id IS NOT NULL) AS users_with_contact,
(SELECT COUNT(*) FROM contacts) AS contacts_total,
(SELECT COUNT(*) FROM contacts WHERE ldap_dn IS NOT NULL) AS contacts_with_ldap,
(SELECT COUNT(*) FROM patch_history) AS ph_total,
(SELECT COUNT(*) FROM patch_history WHERE intervenant_name IS NOT NULL) AS ph_with_name,
(SELECT COUNT(*) FROM patch_history WHERE intervenant_id IS NOT NULL) AS ph_with_user_fk
""")).fetchone()
print(f" users : total={r.users_total} | ldap={r.users_ldap} | lie contact={r.users_with_contact}")
print(f" contacts: total={r.contacts_total} | avec ldap_dn={r.contacts_with_ldap}")
print(f" patch_history : total={r.ph_total} | avec intervenant_name={r.ph_with_name} "
f"| avec intervenant_id (FK users)={r.ph_with_user_fk}")
print("\n=== DISTRIBUTION patch_history.intervenant_name ===")
for row in conn.execute(text("""
SELECT intervenant_name, COUNT(*) AS n
FROM patch_history WHERE intervenant_name IS NOT NULL
GROUP BY 1 ORDER BY 2 DESC
""")).fetchall():
print(f" {row.n:5d} {row.intervenant_name}")
print("\n=== USERS LDAP (candidats FK) ===")
for row in conn.execute(text("""
SELECT u.username, u.display_name, u.email, c.name AS contact_name,
CASE WHEN c.ldap_dn IS NOT NULL THEN 'LDAP' ELSE '-' END AS src
FROM users u LEFT JOIN contacts c ON u.contact_id=c.id
WHERE u.auth_type='ldap' ORDER BY u.username
""")).fetchall():
print(f" {row.username:15s} | {row.display_name or '-':45s} | {row.email:30s} | {row.src}")
def propose_mapping(conn):
"""Retourne dict {intervenant_name -> user_id} en matchant par prenom."""
users = conn.execute(text("""
SELECT id, username, display_name FROM users WHERE auth_type='ldap'
""")).fetchall()
names = conn.execute(text("""
SELECT DISTINCT intervenant_name FROM patch_history
WHERE intervenant_name IS NOT NULL
""")).fetchall()
mapping = {}
for name_row in names:
n = name_row.intervenant_name
if not n:
continue
n_lo = n.strip().lower()
# Matchs d'exclusion - collectives
if n_lo in ("secops", "secops-team", "secops team"):
continue
candidates = []
for u in users:
dn = (u.display_name or "").lower()
# Token match : "khalid" dans "moutaouakil-ext khalid (admin)"
if f" {n_lo} " in f" {dn} " or dn.endswith(f" {n_lo}") or dn.startswith(f"{n_lo} "):
candidates.append(u)
# Cas Joel : display_name peut contenir "Joël" avec accent
elif n_lo == "joel" and ("joël" in dn or "joel" in dn):
candidates.append(u)
if len(candidates) == 1:
mapping[n] = candidates[0].id
elif len(candidates) > 1:
print(f" [AMBIG] '{n}' matche {len(candidates)} users : {[c.username for c in candidates]}")
else:
print(f" [MISS] '{n}' -> aucun user LDAP trouve (peut-etre pas dans groupe secops)")
return mapping
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--apply", action="store_true",
help="Applique vraiment le UPDATE FK (par defaut : dry-run verif)")
args = parser.parse_args()
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
with engine.begin() as conn:
report_state(conn)
print("\n=== MATCHING intervenant_name -> users.id ===")
mapping = propose_mapping(conn)
print(f"\n {len(mapping)} correspondance(s) unique(s) trouvees :")
for name, uid in mapping.items():
u = conn.execute(text("SELECT display_name, username FROM users WHERE id=:i"),
{"i": uid}).fetchone()
print(f" '{name}' -> #{uid} {u.username} ({u.display_name})")
if not args.apply:
print("\n[DRY-RUN] Rien ecrit. Relance avec --apply pour UPDATE patch_history.intervenant_id")
return
print("\n=== APPLY : UPDATE patch_history.intervenant_id ===")
total_updated = 0
for name, uid in mapping.items():
r = conn.execute(text("""
UPDATE patch_history SET intervenant_id = :uid
WHERE intervenant_name = :name AND intervenant_id IS NULL
"""), {"uid": uid, "name": name})
print(f" '{name}' -> user #{uid} : {r.rowcount} lignes")
total_updated += r.rowcount
print(f"\n[OK] Total UPDATE : {total_updated} lignes")
# Re-verif apres
print("\n=== ETAT APRES APPLY ===")
report_state(conn)
if __name__ == "__main__":
main()