Python csv.DictReader ne garde que le dernier quand 2 colonnes ont le meme nom: le Nouveau (condition) ecrasait le Production (lifecycle). Switch vers csv.reader + lecture par indice de colonne (1ere occurrence de Etat = lifecycle).
169 lines
7.0 KiB
Python
169 lines
7.0 KiB
Python
"""Migration etat: lowercase codes -> labels iTop verbatim + import CSV.
|
|
|
|
Aligne la colonne servers.etat sur les valeurs iTop exactes:
|
|
Lifecycle: Production, Implémentation, Stock, Obsolète, EOL, prêt, tests
|
|
Condition: Nouveau, A récupérer, Cassé, Cédé, En panne, Perdu,
|
|
Recyclé, Occasion, A détruire, Volé
|
|
|
|
Etapes:
|
|
1. DROP ancien CHECK, UPDATE lowercase -> iTop verbatim
|
|
2. ADD nouveau CHECK avec labels iTop
|
|
3. Relecture CSV pour re-synchroniser depuis iTop
|
|
|
|
Usage:
|
|
python tools/import_etat_itop.py <csv1> [<csv2> ...] [--dry-run]
|
|
"""
|
|
import os
|
|
import csv
|
|
import argparse
|
|
from sqlalchemy import create_engine, text
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \
|
|
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo"
|
|
|
|
# Etats lifecycle iTop (dropdown unique de PatchCenter)
|
|
ITOP_ETATS = [
|
|
"Production", "Implémentation", "Stock", "Obsolète", "prêt", "tests",
|
|
]
|
|
|
|
# Migration anciennes valeurs (lowercase OU condition iTop) -> NULL ou lifecycle verbatim
|
|
LEGACY_MAP = {
|
|
# lowercase codes (ancien schema)
|
|
"production": "Production",
|
|
"implementation": "Implémentation",
|
|
"stock": "Stock",
|
|
"obsolete": "Obsolète",
|
|
"pret": "prêt",
|
|
"tests": "tests",
|
|
# Verbatim (idempotent)
|
|
"Production": "Production",
|
|
"Implémentation": "Implémentation",
|
|
"Stock": "Stock",
|
|
"Obsolète": "Obsolète",
|
|
"prêt": "prêt",
|
|
# Les valeurs "condition" iTop ne sont pas des lifecycle -> NULL
|
|
# (Nouveau, Cassé, En panne, ... : import_etat_itop les mettra a NULL
|
|
# et ces serveurs seront re-syncs depuis la colonne Status du CSV)
|
|
"eol": None, "EOL": None,
|
|
"nouveau": None, "Nouveau": None,
|
|
"casse": None, "Cassé": None,
|
|
"cede": None, "Cédé": None,
|
|
"en_panne": None, "En panne": None,
|
|
"a_recuperer": None, "A récupérer": None,
|
|
"perdu": None, "Perdu": None,
|
|
"recycle": None, "Recyclé": None,
|
|
"occasion": None, "Occasion": None,
|
|
"a_detruire": None, "A détruire": None,
|
|
"vole": None, "Volé": None,
|
|
}
|
|
|
|
|
|
def norm_etat(raw):
|
|
"""Retourne l'etat lifecycle iTop verbatim, ou None si unknown/condition."""
|
|
if not raw:
|
|
return None
|
|
s = raw.strip()
|
|
if not s or s in ("-", "(null)"):
|
|
return None
|
|
return s if s in ITOP_ETATS else None
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("csv_paths", nargs="*", default=[])
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
engine = create_engine(DATABASE_URL)
|
|
print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}")
|
|
conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT")
|
|
|
|
# 1. Drop CHECK + migrate legacy values
|
|
print("[INFO] Drop ancien CHECK...")
|
|
if not args.dry_run:
|
|
conn.execute(text("ALTER TABLE servers DROP CONSTRAINT IF EXISTS servers_etat_check"))
|
|
|
|
print("[INFO] Migration valeurs existantes -> lifecycle iTop ou NULL...")
|
|
for old, new in LEGACY_MAP.items():
|
|
cnt = conn.execute(text("SELECT COUNT(*) FROM servers WHERE etat=:o"), {"o": old}).scalar()
|
|
if cnt:
|
|
label = new if new is not None else "NULL"
|
|
print(f" {old:18s} -> {label:18s} : {cnt}")
|
|
if not args.dry_run:
|
|
conn.execute(text("UPDATE servers SET etat=:n WHERE etat=:o"), {"n": new, "o": old})
|
|
|
|
# 2. CHECK iTop verbatim
|
|
allowed_sql = ", ".join(f"'{v}'" for v in ITOP_ETATS)
|
|
if not args.dry_run:
|
|
conn.execute(text(
|
|
f"ALTER TABLE servers ADD CONSTRAINT servers_etat_check "
|
|
f"CHECK (etat IN ({allowed_sql}) OR etat IS NULL)"
|
|
))
|
|
print(f"[INFO] CHECK iTop applique: {ITOP_ETATS}")
|
|
|
|
# 3. Re-sync CSV
|
|
if args.csv_paths:
|
|
updated = unchanged = not_found = 0
|
|
unknown = set()
|
|
for csv_path in args.csv_paths:
|
|
print(f"\n[INFO] Lecture {csv_path}")
|
|
with open(csv_path, "r", encoding="utf-8-sig", newline="") as f:
|
|
sample = f.read(4096); f.seek(0)
|
|
delim = ";" if sample.count(";") > sample.count(",") else ","
|
|
reader = csv.reader(f, delimiter=delim)
|
|
header = next(reader)
|
|
# iTop exporte parfois 2 colonnes "Etat" (lifecycle + condition physique).
|
|
# On prend la PREMIERE occurrence = lifecycle.
|
|
idx_etat = next((i for i, h in enumerate(header) if h in ("Etat", "État")), -1)
|
|
idx_nom = next((i for i, h in enumerate(header) if h == "Nom"), -1)
|
|
idx_hostname = next((i for i, h in enumerate(header) if h == "Hostname"), -1)
|
|
rows = list(reader)
|
|
print(f"[INFO] {len(rows)} lignes (delim={delim!r}, idx_etat={idx_etat})")
|
|
if idx_etat == -1:
|
|
print("[WARN] Colonne Etat absente, skip.")
|
|
continue
|
|
for row in rows:
|
|
if idx_nom >= 0 and idx_nom < len(row):
|
|
hostname = row[idx_nom].strip()
|
|
elif idx_hostname >= 0 and idx_hostname < len(row):
|
|
hostname = row[idx_hostname].strip()
|
|
else:
|
|
hostname = ""
|
|
if not hostname or not any(c.isalpha() for c in hostname):
|
|
continue
|
|
# Lifecycle = 1ere colonne Etat (l'iTop export peut avoir 2 "Etat")
|
|
raw = row[idx_etat].strip() if idx_etat < len(row) else ""
|
|
new_etat = norm_etat(raw)
|
|
# Cas: valeur condition iTop (Nouveau, Recycle, ...) -> NULL (physique non deploye)
|
|
is_condition = raw in ("Nouveau", "Recyclé", "A récupérer", "Cassé",
|
|
"Cédé", "En panne", "Perdu", "Occasion",
|
|
"A détruire", "Volé")
|
|
if raw and new_etat is None and not is_condition and raw not in ("-", "(null)"):
|
|
unknown.add(raw); continue
|
|
# is_condition -> force NULL ; valeur vide -> skip
|
|
if new_etat is None and not is_condition:
|
|
continue
|
|
target = new_etat # None si is_condition
|
|
srv = conn.execute(text("SELECT id, etat FROM servers WHERE hostname=:h"),
|
|
{"h": hostname}).fetchone()
|
|
if not srv:
|
|
not_found += 1; continue
|
|
if srv.etat == target:
|
|
unchanged += 1; continue
|
|
if args.dry_run:
|
|
print(f" DRY: {hostname} {srv.etat} -> {target or 'NULL'}")
|
|
else:
|
|
conn.execute(text("UPDATE servers SET etat=:e WHERE id=:sid"),
|
|
{"e": target, "sid": srv.id})
|
|
updated += 1
|
|
print(f"\n[DONE] Maj CSV: {updated} | Inchanges: {unchanged} | Hors base: {not_found}")
|
|
if unknown:
|
|
print(f"[WARN] Etats iTop non reconnus: {unknown}")
|
|
|
|
conn.close()
|
|
print("[OK] Migration terminee")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|