Migre etat vers labels iTop verbatim (Production, Nouveau, etc.)

Aligne la colonne servers.etat sur les valeurs iTop exactes au lieu
des codes lowercase internes.

Impact:
- servers.etat stocke: Production, Implémentation, Stock, Obsolète,
  EOL, prêt, tests, Nouveau, A récupérer, Cassé, Cédé, En panne,
  Perdu, Recyclé, Occasion, A détruire, Volé
- Remplace tous les 'production'/'obsolete'/'stock'/'eol'/'implementation'
  en WHERE/comparisons par les labels iTop verbatim (~10 fichiers)
- Templates badges/filtres: valeurs + labels iTop
- itop_service: maintient mapping iTop API internal code <-> DB label
- import_sanef_*: norm_etat retourne la valeur iTop verbatim ou None
  (plus de fallback silencieux sur 'production')

Ajoute:
- tools/import_etat_itop.py : migration lowercase -> iTop + re-import CSV
- tools/import_environnement.py : fix dry-run pour ADD COLUMN idempotent

Supprime:
- tools/fix_etat_extend.py (obsolete par import_etat_itop.py)
This commit is contained in:
Pierre & Lumière 2026-04-14 18:40:56 +02:00
parent 242641a085
commit 753d4076c9
19 changed files with 224 additions and 205 deletions

View File

@ -338,7 +338,7 @@ async def applications_assign_page(request: Request, app_id: int, db=Depends(get
if not app:
return RedirectResponse(url="/admin/applications?msg=notfound", status_code=303)
where = ["s.etat NOT IN ('stock','obsolete')"]
where = ["s.etat NOT IN ('Stock','Obsolète')"]
params = {}
if search:
where.append("s.hostname ILIKE :s"); params["s"] = f"%{search}%"

View File

@ -134,7 +134,7 @@ async def audit_global(request: Request, db=Depends(get_db)):
parallel = int(form.get("parallel", "5"))
# Construire la requete
where = ["s.os_family = 'linux'", "s.etat = 'production'"]
where = ["s.os_family = 'linux'", "s.etat = 'Production'"]
params = {}
if exclude_domains:
where.append("d.code NOT IN :ed")

View File

@ -18,21 +18,21 @@ async def dashboard(request: Request, db=Depends(get_db)):
# Stats generales
stats = {}
stats["total_servers"] = db.execute(text("SELECT COUNT(*) FROM servers")).scalar()
stats["patchable"] = db.execute(text("SELECT COUNT(*) FROM servers WHERE patch_os_owner='secops' AND etat='production'")).scalar()
stats["patchable"] = db.execute(text("SELECT COUNT(*) FROM servers WHERE patch_os_owner='secops' AND etat='Production'")).scalar()
stats["linux"] = db.execute(text("SELECT COUNT(*) FROM servers WHERE os_family='linux'")).scalar()
stats["windows"] = db.execute(text("SELECT COUNT(*) FROM servers WHERE os_family='windows'")).scalar()
stats["decom"] = db.execute(text("SELECT COUNT(*) FROM servers WHERE etat='obsolete'")).scalar()
stats["decom"] = db.execute(text("SELECT COUNT(*) FROM servers WHERE etat='Obsolète'")).scalar()
stats["obsolete"] = db.execute(text("SELECT COUNT(*) FROM servers WHERE licence_support='obsolete'")).scalar()
stats["qualys_assets"] = db.execute(text("SELECT COUNT(*) FROM qualys_assets")).scalar()
stats["qualys_tags"] = db.execute(text("SELECT COUNT(*) FROM qualys_tags")).scalar()
stats["qualys_active"] = db.execute(text("SELECT COUNT(*) FROM qualys_assets WHERE agent_status ILIKE '%active%' AND agent_status NOT ILIKE '%inactive%'")).scalar()
stats["qualys_inactive"] = db.execute(text("SELECT COUNT(*) FROM qualys_assets WHERE agent_status ILIKE '%inactive%'")).scalar()
stats["qualys_no_agent"] = db.execute(text("SELECT COUNT(*) FROM servers WHERE etat='production' AND NOT EXISTS (SELECT 1 FROM qualys_assets qa WHERE LOWER(qa.hostname) = LOWER(servers.hostname))")).scalar()
stats["qualys_no_agent"] = db.execute(text("SELECT COUNT(*) FROM servers WHERE etat='Production' AND NOT EXISTS (SELECT 1 FROM qualys_assets qa WHERE LOWER(qa.hostname) = LOWER(servers.hostname))")).scalar()
# Par domaine
domains = db.execute(text("""
SELECT d.name, d.code, COUNT(s.id) as total,
COUNT(*) FILTER (WHERE s.etat='production') as actifs,
COUNT(*) FILTER (WHERE s.etat='Production') as actifs,
COUNT(*) FILTER (WHERE s.os_family='linux') as linux,
COUNT(*) FILTER (WHERE s.os_family='windows') as windows
FROM servers s

View File

@ -40,7 +40,7 @@ def _get_planning_data(db, year):
SELECT d.code, d.name, COUNT(s.id) as srv_count
FROM domains d
LEFT JOIN domain_environments de ON de.domain_id = d.id
LEFT JOIN servers s ON s.domain_env_id = de.id AND s.etat = 'production'
LEFT JOIN servers s ON s.domain_env_id = de.id AND s.etat = 'Production'
GROUP BY d.code, d.name, d.display_order
ORDER BY d.display_order
""")).fetchall()

View File

@ -124,7 +124,7 @@ def get_servers_for_planning(db, year, week_number):
or_clauses.append("z.name = 'DMZ'")
where = f"""
s.etat = 'production' AND s.patch_os_owner = 'secops'
s.etat = 'Production' AND s.patch_os_owner = 'secops'
AND s.licence_support IN ('active', 'els') AND s.os_family = 'linux'
AND ({' OR '.join(or_clauses)})
"""

View File

@ -53,7 +53,7 @@ def detect_correspondances(db, dry_run=False):
# Tous les serveurs actifs (exclut stock/obsolete)
rows = db.execute(text("""SELECT id, hostname FROM servers
WHERE etat NOT IN ('stock','obsolete','eol') ORDER BY hostname""")).fetchall()
WHERE etat NOT IN ('Stock','Obsolète','EOL') ORDER BY hostname""")).fetchall()
by_signature = defaultdict(list) # signature -> [(server_id, env_char, hostname)]
for r in rows:
@ -125,7 +125,7 @@ def detect_correspondances(db, dry_run=False):
def get_servers_for_builder(db, search="", app="", domain="", env=""):
"""Retourne tous les serveurs matchant les filtres, avec leurs correspondances existantes.
Exclut les serveurs en stock / obsolete (décommissionnés, EOL)."""
where = ["s.etat NOT IN ('stock','obsolete','eol')"]
where = ["s.etat NOT IN ('Stock','Obsolète','EOL')"]
params = {}
if search:
where.append("s.hostname ILIKE :s"); params["s"] = f"%{search}%"
@ -181,7 +181,7 @@ def bulk_create_correspondance(db, prod_ids, nonprod_ids, env_labels, user_id):
def get_correspondance_view(db, search="", app="", env=""):
"""Vue hiérarchique des correspondances groupées par application.
Exclut les serveurs en stock/obsolete."""
where = ["s.etat NOT IN ('stock','obsolete','eol')"]
where = ["s.etat NOT IN ('Stock','Obsolète','EOL')"]
params = {}
if search:
where.append("s.hostname ILIKE :s"); params["s"] = f"%{search}%"
@ -330,7 +330,7 @@ def get_orphan_nonprod(db):
LEFT JOIN environments e ON de.environment_id = e.id
LEFT JOIN domains d ON de.domain_id = d.id
WHERE e.name IS NOT NULL AND e.name NOT ILIKE '%production%'
AND s.etat NOT IN ('stock','obsolete','eol')
AND s.etat NOT IN ('Stock','Obsolète','EOL')
AND NOT EXISTS (SELECT 1 FROM server_correspondance sc WHERE sc.nonprod_server_id = s.id)
ORDER BY s.application_name, s.hostname
LIMIT 500

View File

@ -337,10 +337,16 @@ def sync_from_itop(db, itop_url, itop_user, itop_pass):
"patch_excludes,domain_ldap_name,last_patch_date,"
"applicationsolution_list")
# PatchCenter etat = iTop status (meme enum: production, implementation, stock, obsolete, eol)
itop_status = {"production": "production", "stock": "stock",
"implementation": "implementation", "obsolete": "obsolete",
"eol": "eol"}
# PatchCenter etat = label iTop verbatim (Production, Implémentation, Stock, Obsolète, EOL, prêt, tests, Nouveau, ...)
itop_status = {
"production": "Production", "implementation": "Implémentation",
"stock": "Stock", "obsolete": "Obsolète", "eol": "EOL",
"pret": "prêt", "tests": "tests", "nouveau": "Nouveau",
"casse": "Cassé", "cede": "Cédé", "en_panne": "En panne",
"a_recuperer": "A récupérer", "perdu": "Perdu",
"recycle": "Recyclé", "occasion": "Occasion",
"a_detruire": "A détruire", "vole": "Volé",
}
for v in vms:
hostname = v.get("name", "").split(".")[0].lower()
@ -404,7 +410,7 @@ def sync_from_itop(db, itop_url, itop_user, itop_pass):
"os_family": "linux" if "linux" in v.get("osfamily_id_friendlyname", "").lower() else "windows",
"os_version": v.get("osversion_id_friendlyname", ""),
"machine_type": "vm",
"etat": itop_status.get(v.get("status", ""), "production"),
"etat": itop_status.get(v.get("status", ""), "Production"),
"de_id": de_id, "zone_id": zone_id,
"resp_srv": resp_srv_name,
"resp_srv_email": person_email.get(resp_srv_name.lower(), ""),
@ -499,7 +505,7 @@ def sync_from_itop(db, itop_url, itop_user, itop_pass):
try:
db.execute(text("""INSERT INTO servers (hostname, fqdn, os_family, os_version, machine_type,
etat, responsable_nom, commentaire, site, ssh_port, ssh_user, ssh_method, tier)
VALUES (:h, :f, :osf, :osv, 'physical', 'production', :resp, :desc, :site,
VALUES (:h, :f, :osf, :osv, 'physical', 'Production', :resp, :desc, :site,
22, 'root', 'ssh_key', 'tier0')"""),
{"h": hostname, "f": s.get("name", hostname), "osf": osf, "osv": osv,
"resp": resp, "desc": s.get("description", ""),
@ -562,8 +568,16 @@ def sync_to_itop(db, itop_url, itop_user, itop_pass):
for v in client.get_all("VirtualMachine", "name"):
itop_vms[v["name"].split(".")[0].lower()] = v
status_map = {"production": "production", "implementation": "implementation",
"stock": "stock", "obsolete": "obsolete", "eol": "eol"}
# DB (iTop label verbatim) -> iTop API internal code
status_map = {
"Production": "production", "Implémentation": "implementation",
"Stock": "stock", "Obsolète": "obsolete", "EOL": "eol",
"prêt": "pret", "tests": "tests", "Nouveau": "nouveau",
"Cassé": "casse", "Cédé": "cede", "En panne": "en_panne",
"A récupérer": "a_recuperer", "Perdu": "perdu",
"Recyclé": "recycle", "Occasion": "occasion",
"A détruire": "a_detruire", "Volé": "vole",
}
tier_map = {"tier0": "Tier 0", "tier1": "Tier 1", "tier2": "Tier 2", "tier3": "Tier 3"}
# Build OSVersion cache: name.lower() → itop_id
@ -599,7 +613,7 @@ def sync_to_itop(db, itop_url, itop_user, itop_pass):
if srv.mgmt_ip:
fields["managementip"] = srv.mgmt_ip.split("/")[0]
if srv.etat:
fields["status"] = status_map.get(srv.etat, "production")
fields["status"] = status_map.get(srv.etat, "production") # iTop API internal code
if srv.commentaire:
fields["description"] = srv.commentaire
if srv.patch_excludes:

View File

@ -146,7 +146,7 @@ def _check_server(s):
result["eligible"] = False
result["exclude_reason"] = "obsolete"
result["exclude_detail"] = "Licence EOL"
elif s.etat != 'production':
elif s.etat != 'Production':
result["eligible"] = False
result["exclude_reason"] = "non_patchable"
result["exclude_detail"] = f"Etat: {s.etat}"
@ -159,7 +159,7 @@ def _check_server(s):
result["exclude_detail"] = "Licence EOL — serveur non supporte"
return result
if s.etat != 'production':
if s.etat != 'Production':
result["eligible"] = False
result["exclude_reason"] = "non_patchable"
result["exclude_detail"] = f"Etat: {s.etat}"
@ -256,7 +256,7 @@ def _auto_exclude(db, campaign_id):
JOIN servers s ON ps.server_id = s.id
WHERE ps.campaign_id = :cid AND ps.status = 'pending'
AND (s.licence_support = 'obsolete'
OR s.etat != 'production'
OR s.etat != 'Production'
OR ps.prereq_ssh = 'ko'
OR ps.prereq_disk_ok = false)
"""), {"cid": campaign_id}).fetchall()
@ -265,7 +265,7 @@ def _auto_exclude(db, campaign_id):
for s in non_eligible:
if s.licence_support == 'obsolete':
reason, detail = "obsolete", "Licence EOL — auto-exclu"
elif s.etat != 'production':
elif s.etat != 'Production':
reason, detail = "non_patchable", f"Etat {s.etat} — auto-exclu"
elif s.prereq_disk_ok is False:
reason, detail = "creneau_inadequat", "Espace disque insuffisant — auto-exclu"

View File

@ -86,7 +86,7 @@ def get_eligible_servers(db):
LEFT JOIN environments e ON de.environment_id = e.id
LEFT JOIN quickwin_server_config qc ON qc.server_id = s.id
WHERE s.os_family = 'linux'
AND s.etat = 'production'
AND s.etat = 'Production'
AND s.patch_os_owner = 'secops'
ORDER BY e.display_order, d.display_order, s.hostname
""")).fetchall()
@ -184,7 +184,7 @@ def get_available_servers(db, run_id, search="", domains=None, envs=None, zones=
LEFT JOIN environments e ON de.environment_id = e.id
LEFT JOIN zones z ON s.zone_id = z.id
WHERE s.os_family = 'linux'
AND s.etat = 'production'
AND s.etat = 'Production'
AND s.patch_os_owner = 'secops'
AND s.id NOT IN (SELECT server_id FROM quickwin_entries WHERE run_id = :rid)
ORDER BY d.name, e.name, s.hostname
@ -211,7 +211,7 @@ def get_available_filters(db, run_id):
LEFT JOIN environments e ON de.environment_id = e.id
LEFT JOIN zones z ON s.zone_id = z.id
WHERE s.os_family = 'linux'
AND s.etat = 'production'
AND s.etat = 'Production'
AND s.patch_os_owner = 'secops'
AND s.id NOT IN (SELECT server_id FROM quickwin_entries WHERE run_id = :rid)
"""), {"rid": run_id}).fetchall()

View File

@ -51,7 +51,7 @@
<div class="flex justify-between"><span class="text-gray-500">Environnement</span><span class="badge {% if s.environnement == 'Production' %}badge-green{% else %}badge-yellow{% endif %}">{{ s.environnement }}</span></div>
<div class="flex justify-between"><span class="text-gray-500">Zone</span><span class="badge {% if s.zone == 'DMZ' %}badge-red{% else %}badge-blue{% endif %}">{{ s.zone or 'LAN' }}</span></div>
<div class="flex justify-between"><span class="text-gray-500">Tier</span><span class="badge {% if s.tier == 'tier0' %}badge-red{% elif s.tier == 'tier1' %}badge-yellow{% else %}badge-blue{% endif %}">{{ s.tier }}</span></div>
<div class="flex justify-between"><span class="text-gray-500">Etat</span><span class="badge {% if s.etat == 'production' %}badge-green{% elif s.etat in ('obsolete','eol') %}badge-red{% else %}badge-yellow{% endif %}">{% if s.etat == 'obsolete' %}Décommissionné{% elif s.etat == 'eol' %}EOL{% else %}{{ s.etat }}{% endif %}</span></div>
<div class="flex justify-between"><span class="text-gray-500">Etat</span><span class="badge {% if s.etat == 'Production' %}badge-green{% elif s.etat in ('Obsolète','EOL') %}badge-red{% else %}badge-yellow{% endif %}">{{ s.etat }}</span></div>
</div>
</div>

View File

@ -58,7 +58,7 @@
<div>
<label class="text-xs text-gray-500">Etat</label>
<select name="etat" class="w-full">
{% for e,l in [('production','Production'),('implementation','Implementation'),('stock','Stock'),('obsolete','Décommissionné'),('eol','EOL')] %}<option value="{{ e }}" {% if e == s.etat %}selected{% endif %}>{{ l }}</option>{% endfor %}
{% for e in ['Production','Implémentation','Stock','Obsolète','EOL','prêt','tests','Nouveau','En panne','Cassé','Cédé','A récupérer','Perdu','Recyclé','Occasion','A détruire','Volé'] %}<option value="{{ e }}" {% if e == s.etat %}selected{% endif %}>{{ e }}</option>{% endfor %}
</select>
</div>
<div>

View File

@ -212,7 +212,7 @@ function refreshAgents() {
<td class="p-2 text-center text-gray-400">{{ s.domain or '-' }}</td>
<td class="p-2 text-center">{{ s.env or '-' }}</td>
<td class="p-2 text-center">{% if s.zone == 'DMZ' %}<span class="badge badge-red">DMZ</span>{% else %}{{ s.zone or '-' }}{% endif %}</td>
<td class="p-2 text-center" title="{{ s.etat or '' }}"><span class="badge {% if s.etat == 'production' %}badge-green{% elif s.etat in ('obsolete','eol') %}badge-red{% elif s.etat == 'stock' %}badge-gray{% else %}badge-yellow{% endif %}">{% if s.etat == 'obsolete' %}Décom.{% elif s.etat == 'eol' %}EOL{% elif s.etat == 'production' %}Prod{% else %}{{ (s.etat or '-')[:8] }}{% endif %}</span></td>
<td class="p-2 text-center" title="{{ s.etat or '' }}"><span class="badge {% if s.etat == 'Production' %}badge-green{% elif s.etat in ('Obsolète','EOL') %}badge-red{% elif s.etat == 'Stock' %}badge-gray{% else %}badge-yellow{% endif %}">{% if s.etat == 'Obsolète' %}Décom.{% elif s.etat == 'EOL' %}EOL{% elif s.etat == 'Production' %}Prod{% else %}{{ (s.etat or '-')[:8] }}{% endif %}</span></td>
</tr>
{% endfor %}
</tbody>

View File

@ -127,7 +127,7 @@
<td class="p-2 text-center text-gray-400">{{ s.domain or '-' }}</td>
<td class="p-2 text-center">{{ s.env or '-' }}</td>
<td class="p-2 text-center">
{% if s.etat == 'production' %}<span class="badge badge-green">Prod</span>
{% if s.etat == 'Production' %}<span class="badge badge-green">Prod</span>
{% else %}{{ s.etat or '-' }}{% endif %}
</td>
<td class="p-2 text-center">

View File

@ -36,7 +36,7 @@
{% for t in ['tier0','tier1','tier2','tier3'] %}<option value="{{ t }}" {% if filters.tier == t %}selected{% endif %}>{{ t }}</option>{% endfor %}
</select>
<select name="etat" onchange="this.form.submit()"><option value="">Etat</option>
{% for e,l in [('production','Production'),('implementation','Implementation'),('stock','Stock'),('obsolete','Décommissionné'),('eol','EOL')] %}<option value="{{ e }}" {% if filters.etat == e %}selected{% endif %}>{{ l }}</option>{% endfor %}
{% for e in ['Production','Implémentation','Stock','Obsolète','EOL','prêt','tests','Nouveau','En panne','Cassé','Cédé','A récupérer','Perdu','Recyclé','Occasion','A détruire','Volé'] %}<option value="{{ e }}" {% if filters.etat == e %}selected{% endif %}>{{ e }}</option>{% endfor %}
</select>
<select name="os" onchange="this.form.submit()"><option value="">OS</option>
<option value="linux" {% if filters.os == 'linux' %}selected{% endif %}>Linux</option>
@ -82,7 +82,7 @@ const bulkValues = {
domain_code: [{% for d in domains_list %}{v:"{{ d.code }}", l:"{{ d.name }}"},{% endfor %}],
env_code: [{% for e in envs_list %}{v:"{{ e.code }}", l:"{{ e.name }}"},{% endfor %}],
tier: [{v:"tier0",l:"tier0"},{v:"tier1",l:"tier1"},{v:"tier2",l:"tier2"},{v:"tier3",l:"tier3"}],
etat: [{v:"production",l:"Production"},{v:"implementation",l:"Implementation"},{v:"stock",l:"Stock"},{v:"obsolete",l:"Décommissionné"},{v:"eol",l:"EOL"}],
etat: [{v:"Production",l:"Production"},{v:"Implémentation",l:"Implémentation"},{v:"Stock",l:"Stock"},{v:"Obsolète",l:"Obsolète"},{v:"EOL",l:"EOL"},{v:"prêt",l:"prêt"},{v:"tests",l:"tests"},{v:"Nouveau",l:"Nouveau"},{v:"En panne",l:"En panne"},{v:"Cassé",l:"Cassé"},{v:"Cédé",l:"Cédé"},{v:"A récupérer",l:"A récupérer"},{v:"Perdu",l:"Perdu"},{v:"Recyclé",l:"Recyclé"},{v:"Occasion",l:"Occasion"},{v:"A détruire",l:"A détruire"},{v:"Volé",l:"Volé"}],
patch_os_owner: [{v:"secops",l:"secops"},{v:"ipop",l:"ipop"},{v:"na",l:"na"}],
licence_support: [{v:"active",l:"active"},{v:"obsolete",l:"obsolete"},{v:"els",l:"els"}],
};
@ -135,7 +135,7 @@ function updateBulk() {
<td class="p-2 text-center text-xs text-gray-400" title="{{ s.os_version or '' }}">{{ s.os_short or '-' }}</td>
<td class="p-2 text-center"><span class="badge {% if s.licence_support == 'active' %}badge-green{% elif s.licence_support == 'obsolete' %}badge-red{% elif s.licence_support == 'els' %}badge-yellow{% else %}badge-gray{% endif %}">{{ s.licence_support }}</span></td>
<td class="p-2 text-center"><span class="badge {% if s.tier == 'tier0' %}badge-red{% elif s.tier == 'tier1' %}badge-yellow{% elif s.tier == 'tier2' %}badge-blue{% else %}badge-green{% endif %}">{{ s.tier }}</span></td>
<td class="p-2 text-center"><span class="badge {% if s.etat == 'production' %}badge-green{% elif s.etat in ('obsolete','eol') %}badge-red{% else %}badge-yellow{% endif %}"title="{{ s.etat or '' }}">{% if s.etat == 'obsolete' %}Décom.{% elif s.etat == 'eol' %}EOL{% elif s.etat == 'production' %}Prod{% elif s.etat == 'implementation' %}Implm{% elif s.etat == 'stock' %}Stock{% else %}{{ (s.etat or '')[:8] }}{% endif %}</span></td>
<td class="p-2 text-center"><span class="badge {% if s.etat == 'Production' %}badge-green{% elif s.etat in ('Obsolète','EOL') %}badge-red{% else %}badge-yellow{% endif %}"title="{{ s.etat or '' }}">{% if s.etat == 'Obsolète' %}Décom.{% elif s.etat == 'EOL' %}EOL{% elif s.etat == 'Production' %}Prod{% elif s.etat == 'Implémentation' %}Implm{% elif s.etat == 'Stock' %}Stock{% else %}{{ (s.etat or '')[:8] }}{% endif %}</span></td>
<td class="p-2 text-center text-xs">{{ s.patch_os_owner or '-' }}</td>
<td class="p-2 text-xs text-gray-300" title="{{ s.application_name or '' }}">{{ (s.application_name or '-')[:35] }}</td>
<td class="p-2 text-xs" onclick="event.stopPropagation()" style="max-width:220px">

View File

@ -1,139 +0,0 @@
"""Etend le CHECK constraint etat + re-map depuis les CSV iTop.
Ajoute les etats 'condition physique' iTop manquants puis relit les CSV
(Server, VirtualMachine, Hyperviseur, Serveur physique) pour mettre a jour
uniquement la colonne etat des serveurs existants.
Usage:
python tools/fix_etat_extend.py <csv1> [<csv2> ...] [--dry-run]
"""
import os
import csv
import argparse
import unicodedata
from sqlalchemy import create_engine, text
DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo"
# Tous les etats iTop -> valeur normalisee
ETAT_MAP = {
"production": "production",
"implémentation": "implementation",
"implementation": "implementation",
"stock": "stock",
"obsolète": "obsolete",
"obsolete": "obsolete",
"eol": "eol",
"prêt": "pret",
"pret": "pret",
"tests": "tests",
"test": "tests",
"a récupérer": "a_recuperer",
"a recuperer": "a_recuperer",
"à récupérer": "a_recuperer",
"cassé": "casse",
"casse": "casse",
"cédé": "cede",
"cede": "cede",
"en panne": "en_panne",
"nouveau": "nouveau",
"perdu": "perdu",
"recyclé": "recycle",
"recycle": "recycle",
"occasion": "occasion",
"a détruire": "a_detruire",
"à détruire": "a_detruire",
"a detruire": "a_detruire",
"volé": "vole",
"vole": "vole",
}
ALLOWED = sorted(set(ETAT_MAP.values()))
def strip_accents(s):
return "".join(c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn")
def norm_etat(raw):
if not raw:
return None
key = raw.strip().lower()
if key in ("-", "(null)", ""):
return None
if key in ETAT_MAP:
return ETAT_MAP[key]
# fallback: sans accent
no_acc = strip_accents(key)
if no_acc in ETAT_MAP:
return ETAT_MAP[no_acc]
return None
def main():
parser = argparse.ArgumentParser()
parser.add_argument("csv_paths", nargs="+")
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}")
conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT")
# 1. Etendre le CHECK constraint
print("[INFO] Etend le CHECK constraint etat...")
conn.execute(text("ALTER TABLE servers DROP CONSTRAINT IF EXISTS servers_etat_check"))
allowed_sql = ", ".join(f"'{v}'" for v in ALLOWED)
conn.execute(text(f"ALTER TABLE servers ADD CONSTRAINT servers_etat_check CHECK (etat IN ({allowed_sql}) OR etat IS NULL)"))
print(f"[INFO] Etats autorises: {ALLOWED}")
# 2. Relire CSV et mettre a jour
updated = 0
unchanged = 0
not_found = 0
unknown_etats = set()
for csv_path in args.csv_paths:
print(f"\n[INFO] Lecture {csv_path}")
with open(csv_path, "r", encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(f)
rows = list(reader)
print(f"[INFO] {len(rows)} lignes")
for r in rows:
hostname = (r.get("Nom") or r.get("Hostname") or "").strip()
if not hostname or not any(c.isalpha() for c in hostname):
continue
raw_etat = r.get("Etat") or r.get("État") or r.get("Status")
new_etat = norm_etat(raw_etat)
if new_etat is None and raw_etat and raw_etat.strip() not in ("-", "(null)", ""):
unknown_etats.add(raw_etat.strip())
continue
if new_etat is None:
continue
srv = conn.execute(text("SELECT id, etat FROM servers WHERE hostname=:h"),
{"h": hostname}).fetchone()
if not srv:
not_found += 1
continue
if srv.etat == new_etat:
unchanged += 1
continue
if args.dry_run:
print(f" DRY: {hostname} {srv.etat} -> {new_etat}")
else:
conn.execute(text("UPDATE servers SET etat=:e WHERE id=:sid"),
{"e": new_etat, "sid": srv.id})
updated += 1
conn.close()
print(f"\n[DONE] Maj: {updated} | Inchanges: {unchanged} | Hors base: {not_found}")
if unknown_etats:
print(f"[WARN] Etats non mappes rencontres: {unknown_etats}")
if __name__ == "__main__":
main()

View File

@ -59,16 +59,15 @@ def main():
print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}")
conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT")
# 1. Ajoute colonne + CHECK
# 1. Ajoute colonne + CHECK (idempotent, toujours execute y compris dry-run)
print("[INFO] Ajoute colonne environnement...")
if not args.dry_run:
conn.execute(text("ALTER TABLE servers ADD COLUMN IF NOT EXISTS environnement varchar(20)"))
conn.execute(text("ALTER TABLE servers DROP CONSTRAINT IF EXISTS servers_environnement_check"))
allowed_sql = ", ".join(f"'{v}'" for v in ITOP_ENVS)
conn.execute(text(
f"ALTER TABLE servers ADD CONSTRAINT servers_environnement_check "
f"CHECK (environnement IN ({allowed_sql}) OR environnement IS NULL)"
))
conn.execute(text("ALTER TABLE servers ADD COLUMN IF NOT EXISTS environnement varchar(20)"))
conn.execute(text("ALTER TABLE servers DROP CONSTRAINT IF EXISTS servers_environnement_check"))
allowed_sql = ", ".join(f"'{v}'" for v in ITOP_ENVS)
conn.execute(text(
f"ALTER TABLE servers ADD CONSTRAINT servers_environnement_check "
f"CHECK (environnement IN ({allowed_sql}) OR environnement IS NULL)"
))
print(f"[INFO] Envs iTop autorises: {ITOP_ENVS}")
# 2. Relecture CSV -> update

143
tools/import_etat_itop.py Normal file
View File

@ -0,0 +1,143 @@
"""Migration etat: lowercase codes -> labels iTop verbatim + import CSV.
Aligne la colonne servers.etat sur les valeurs iTop exactes:
Lifecycle: Production, Implémentation, Stock, Obsolète, EOL, prêt, tests
Condition: Nouveau, A récupérer, Cassé, Cédé, En panne, Perdu,
Recyclé, Occasion, A détruire, Volé
Etapes:
1. DROP ancien CHECK, UPDATE lowercase -> iTop verbatim
2. ADD nouveau CHECK avec labels iTop
3. Relecture CSV pour re-synchroniser depuis iTop
Usage:
python tools/import_etat_itop.py <csv1> [<csv2> ...] [--dry-run]
"""
import os
import csv
import argparse
from sqlalchemy import create_engine, text
DATABASE_URL = os.getenv("DATABASE_URL_DEMO") or os.getenv("DATABASE_URL") \
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_demo"
ITOP_ETATS = [
# Lifecycle
"Production", "Implémentation", "Stock", "Obsolète", "EOL", "prêt", "tests",
# Condition
"Nouveau", "A récupérer", "Cassé", "Cédé", "En panne",
"Perdu", "Recyclé", "Occasion", "A détruire", "Volé",
]
# Migration valeurs existantes (lowercase) -> labels iTop verbatim
LEGACY_MAP = {
"production": "Production",
"implementation": "Implémentation",
"stock": "Stock",
"obsolete": "Obsolète",
"eol": "EOL",
"pret": "prêt",
"tests": "tests",
"nouveau": "Nouveau",
"casse": "Cassé",
"cede": "Cédé",
"en_panne": "En panne",
"a_recuperer": "A récupérer",
"perdu": "Perdu",
"recycle": "Recyclé",
"occasion": "Occasion",
"a_detruire": "A détruire",
"vole": "Volé",
}
def norm_etat(raw):
if not raw:
return None
s = raw.strip()
if not s or s in ("-", "(null)"):
return None
if s in ITOP_ETATS:
return s
# Tolerance case-insensitive + sans accent pour les alias courants
low = s.lower()
if low in LEGACY_MAP:
return LEGACY_MAP[low]
return None
def main():
parser = argparse.ArgumentParser()
parser.add_argument("csv_paths", nargs="*", default=[])
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
engine = create_engine(DATABASE_URL)
print(f"[INFO] DB: {DATABASE_URL.split('@')[-1]}")
conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT")
# 1. Drop CHECK + migrate legacy values
print("[INFO] Drop ancien CHECK...")
if not args.dry_run:
conn.execute(text("ALTER TABLE servers DROP CONSTRAINT IF EXISTS servers_etat_check"))
print("[INFO] Migration lowercase -> iTop verbatim...")
for old, new in LEGACY_MAP.items():
cnt = conn.execute(text("SELECT COUNT(*) FROM servers WHERE etat=:o"), {"o": old}).scalar()
if cnt:
print(f" {old:18s} -> {new:18s} : {cnt}")
if not args.dry_run:
conn.execute(text("UPDATE servers SET etat=:n WHERE etat=:o"), {"n": new, "o": old})
# 2. CHECK iTop verbatim
allowed_sql = ", ".join(f"'{v}'" for v in ITOP_ETATS)
if not args.dry_run:
conn.execute(text(
f"ALTER TABLE servers ADD CONSTRAINT servers_etat_check "
f"CHECK (etat IN ({allowed_sql}) OR etat IS NULL)"
))
print(f"[INFO] CHECK iTop applique: {ITOP_ETATS}")
# 3. Re-sync CSV
if args.csv_paths:
updated = unchanged = not_found = 0
unknown = set()
for csv_path in args.csv_paths:
print(f"\n[INFO] Lecture {csv_path}")
with open(csv_path, "r", encoding="utf-8-sig", newline="") as f:
sample = f.read(4096); f.seek(0)
delim = ";" if sample.count(";") > sample.count(",") else ","
rows = list(csv.DictReader(f, delimiter=delim))
print(f"[INFO] {len(rows)} lignes (delim={delim!r})")
for r in rows:
hostname = (r.get("Nom") or r.get("Hostname") or "").strip()
if not hostname or not any(c.isalpha() for c in hostname):
continue
raw = (r.get("Etat") or r.get("État") or "").strip()
new_etat = norm_etat(raw)
if raw and new_etat is None and raw not in ("-", "(null)"):
unknown.add(raw); continue
if new_etat is None:
continue
srv = conn.execute(text("SELECT id, etat FROM servers WHERE hostname=:h"),
{"h": hostname}).fetchone()
if not srv:
not_found += 1; continue
if srv.etat == new_etat:
unchanged += 1; continue
if args.dry_run:
print(f" DRY: {hostname} {srv.etat} -> {new_etat}")
else:
conn.execute(text("UPDATE servers SET etat=:e WHERE id=:sid"),
{"e": new_etat, "sid": srv.id})
updated += 1
print(f"\n[DONE] Maj CSV: {updated} | Inchanges: {unchanged} | Hors base: {not_found}")
if unknown:
print(f"[WARN] Etats iTop non reconnus: {unknown}")
conn.close()
print("[OK] Migration terminee")
if __name__ == "__main__":
main()

View File

@ -26,19 +26,19 @@ def norm_os_family(famille):
return None
ITOP_ETATS = {
"Production", "Implémentation", "Stock", "Obsolète", "EOL", "prêt", "tests",
"Nouveau", "A récupérer", "Cassé", "Cédé", "En panne",
"Perdu", "Recyclé", "Occasion", "A détruire", "Volé",
}
def norm_etat(status, etat):
"""Mapper vers les valeurs autorisées par le CHECK constraint:
production, implementation, stock, obsolete, eol."""
e = (etat or "").strip().lower()
if "stock" in e:
return "stock"
if "implémentation" in e or "implementation" in e:
return "implementation"
if "obsol" in e:
return "obsolete"
if "eol" in e:
return "eol"
return "production"
"""Retourne la valeur iTop verbatim si reconnue, sinon None (pas de fallback silencieux)."""
raw = (etat or "").strip()
if not raw or raw == "-":
return None
return raw if raw in ITOP_ETATS else None
def main():

View File

@ -25,17 +25,19 @@ def norm_os_family(famille):
return None
ITOP_ETATS = {
"Production", "Implémentation", "Stock", "Obsolète", "EOL", "prêt", "tests",
"Nouveau", "A récupérer", "Cassé", "Cédé", "En panne",
"Perdu", "Recyclé", "Occasion", "A détruire", "Volé",
}
def norm_etat(status, etat):
e = (etat or "").strip().lower()
if "stock" in e:
return "stock"
if "implémentation" in e or "implementation" in e:
return "implementation"
if "obsol" in e:
return "obsolete"
if "eol" in e:
return "eol"
return "production"
"""Retourne la valeur iTop verbatim si reconnue, sinon None (pas de fallback silencieux)."""
raw = (etat or "").strip()
if not raw or raw == "-":
return None
return raw if raw in ITOP_ETATS else None
def main():