patchcenter/app/services/itop_service.py
Admin MPCZ 753d4076c9 Migre etat vers labels iTop verbatim (Production, Nouveau, etc.)
Aligne la colonne servers.etat sur les valeurs iTop exactes au lieu
des codes lowercase internes.

Impact:
- servers.etat stocke: Production, Implémentation, Stock, Obsolète,
  EOL, prêt, tests, Nouveau, A récupérer, Cassé, Cédé, En panne,
  Perdu, Recyclé, Occasion, A détruire, Volé
- Remplace tous les 'production'/'obsolete'/'stock'/'eol'/'implementation'
  en WHERE/comparisons par les labels iTop verbatim (~10 fichiers)
- Templates badges/filtres: valeurs + labels iTop
- itop_service: maintient mapping iTop API internal code <-> DB label
- import_sanef_*: norm_etat retourne la valeur iTop verbatim ou None
  (plus de fallback silencieux sur 'production')

Ajoute:
- tools/import_etat_itop.py : migration lowercase -> iTop + re-import CSV
- tools/import_environnement.py : fix dry-run pour ADD COLUMN idempotent

Supprime:
- tools/fix_etat_extend.py (obsolete par import_etat_itop.py)
2026-04-14 18:40:56 +02:00

693 lines
33 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Service iTop REST API — synchronisation bidirectionnelle complète"""
import logging
import requests
import json
from datetime import datetime
from collections import defaultdict
from sqlalchemy import text
log = logging.getLogger(__name__)
class ITopClient:
"""Client REST iTop v1.3"""
def __init__(self, url, user, password):
self.url = url.rstrip("/") + "/webservices/rest.php?version=1.3"
self.user = user
self.password = password
def _call(self, operation, **kwargs):
data = {"operation": operation, **kwargs}
try:
r = requests.post(self.url,
data={"json_data": json.dumps(data),
"auth_user": self.user,
"auth_pwd": self.password},
verify=False, timeout=30)
return r.json()
except Exception as e:
log.error(f"iTop error: {e}")
return {"code": -1, "message": str(e)}
def get_all(self, cls, fields):
r = self._call("core/get", **{"class": cls, "key": f"SELECT {cls}", "output_fields": fields})
if r.get("code") == 0 and r.get("objects"):
return [{"itop_id": o["key"], **o["fields"]} for o in r["objects"].values()]
return []
def update(self, cls, key, fields):
return self._call("core/update", **{"class": cls, "key": str(key), "fields": fields, "comment": "PatchCenter sync"})
def create(self, cls, fields):
return self._call("core/create", **{"class": cls, "fields": fields, "comment": "PatchCenter sync"})
def _normalize_os_for_itop(os_string):
"""Normalise PRETTY_NAME vers un nom propre pour iTop OSVersion.
Ex: 'Debian GNU/Linux 12 (bookworm)''Debian 12 (Bookworm)'
'CentOS Stream 9''CentOS Stream 9'
'Red Hat Enterprise Linux 9.4 (Plow)''RHEL 9.4 (Plow)'
"""
import re
s = os_string.strip()
# Debian GNU/Linux X (codename) → Debian X (Codename)
m = re.match(r'Debian GNU/Linux (\d+)\s*\((\w+)\)', s, re.I)
if m:
return f"Debian {m.group(1)} ({m.group(2).capitalize()})"
# Ubuntu X.Y LTS
m = re.match(r'Ubuntu (\d+\.\d+(?:\.\d+)?)\s*(LTS)?', s, re.I)
if m:
lts = " LTS" if m.group(2) else ""
return f"Ubuntu {m.group(1)}{lts}"
# Red Hat Enterprise Linux [Server] [release] X → RHEL X
m = re.match(r'Red Hat Enterprise Linux\s*(?:Server)?\s*(?:release)?\s*(\d+[\.\d]*)\s*\(?([\w]*)\)?', s, re.I)
if m:
codename = f" ({m.group(2).capitalize()})" if m.group(2) else ""
return f"RHEL {m.group(1)}{codename}"
# CentOS Stream release X → CentOS Stream X
m = re.match(r'CentOS Stream\s+release\s+(\d+[\.\d]*)', s, re.I)
if m:
return f"CentOS Stream {m.group(1)}"
# CentOS Linux X.Y → CentOS X.Y
m = re.match(r'CentOS\s+Linux\s+(\d+[\.\d]*)', s, re.I)
if m:
return f"CentOS {m.group(1)}"
# Rocky Linux X.Y (codename) → Rocky Linux X.Y
m = re.match(r'Rocky\s+Linux\s+(\d+[\.\d]*)', s, re.I)
if m:
return f"Rocky Linux {m.group(1)}"
# Oracle Linux / Oracle Enterprise Linux
m = re.match(r'Oracle\s+(?:Enterprise\s+)?Linux\s+(\d+[\.\d]*)', s, re.I)
if m:
return f"Oracle Linux {m.group(1)}"
# Fallback: remove "release" word
return re.sub(r'\s+release\s+', ' ', s).strip()
def _upsert_ip(db, server_id, ip):
if not ip:
return
# Remove all old itop-managed primary IPs for this server (keep only the current one)
db.execute(text(
"DELETE FROM server_ips WHERE server_id=:sid AND ip_type='primary' AND description='itop' AND ip_address != :ip"),
{"sid": server_id, "ip": ip})
# Check if this exact IP already exists
exact = db.execute(text(
"SELECT id FROM server_ips WHERE server_id=:sid AND ip_address=:ip"),
{"sid": server_id, "ip": ip}).fetchone()
if exact:
return
try:
db.execute(text(
"INSERT INTO server_ips (server_id, ip_address, ip_type, is_ssh, description) VALUES (:sid, :ip, 'primary', true, 'itop')"),
{"sid": server_id, "ip": ip})
except Exception:
pass
def _save_sync_timestamp(db, direction, stats):
"""Enregistre le timestamp et les stats de la dernière synchro"""
key = f"last_sync_{direction}"
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
val = json.dumps({"date": now, "stats": stats})
existing = db.execute(text("SELECT key FROM settings WHERE key=:k"), {"k": key}).fetchone()
if existing:
db.execute(text("UPDATE settings SET value=:v WHERE key=:k"), {"k": key, "v": val})
else:
db.execute(text("INSERT INTO settings (key, value) VALUES (:k, :v)"), {"k": key, "v": val})
def get_last_sync(db, direction="from"):
"""Récupère la date et stats de la dernière synchro"""
key = f"last_sync_{direction}"
row = db.execute(text("SELECT value FROM settings WHERE key=:k"), {"k": key}).fetchone()
if row:
try:
return json.loads(row.value)
except Exception:
pass
return None
# ══════════════════════════════════════════════════════════
# IMPORT: iTop → PatchCenter
# ══════════════════════════════════════════════════════════
def sync_from_itop(db, itop_url, itop_user, itop_pass):
"""Import complet depuis iTop: typologies, contacts, serveurs"""
client = ITopClient(itop_url, itop_user, itop_pass)
stats = {"contacts": 0, "environments": 0, "domains": 0, "zones": 0,
"servers_created": 0, "servers_updated": 0, "ips": 0, "errors": []}
try:
db.rollback()
except Exception:
pass
# ─── 1. Typologies: Environnements ───
for item in client.get_all("Environnement", "name"):
name = item.get("name", "")
if not name:
continue
existing = db.execute(text("SELECT id FROM environments WHERE LOWER(name)=LOWER(:n)"), {"n": name}).fetchone()
if not existing:
try:
db.execute(text("INSERT INTO environments (name, code) VALUES (:n, :c)"),
{"n": name, "c": name[:10].upper().replace(" ", "").replace("-", "")})
db.commit()
stats["environments"] += 1
except Exception:
db.rollback()
# ─── 2. Typologies: Domaines applicatifs ───
for item in client.get_all("DomaineApplicatif", "name"):
name = item.get("name", "")
if not name:
continue
existing = db.execute(text("SELECT id FROM domains WHERE LOWER(name)=LOWER(:n)"), {"n": name}).fetchone()
if not existing:
try:
db.execute(text("INSERT INTO domains (name, code) VALUES (:n, :c)"),
{"n": name, "c": name[:10].upper().replace(" ", "")})
db.commit()
stats["domains"] += 1
except Exception:
db.rollback()
# ─── 3. Typologies: Zones ───
for item in client.get_all("Zone", "name"):
name = item.get("name", "")
if not name:
continue
existing = db.execute(text("SELECT id FROM zones WHERE LOWER(name)=LOWER(:n)"), {"n": name}).fetchone()
if not existing:
try:
db.execute(text("INSERT INTO zones (name, is_dmz) VALUES (:n, :d)"),
{"n": name, "d": "dmz" in name.lower()})
db.commit()
stats["zones"] += 1
except Exception:
db.rollback()
# ─── 4. Typologies: DomainLdap → domain_ltd_list ───
for item in client.get_all("DomainLdap", "name"):
name = item.get("name", "")
if not name:
continue
existing = db.execute(text("SELECT id FROM domain_ltd_list WHERE LOWER(name)=LOWER(:n)"), {"n": name}).fetchone()
if not existing:
try:
db.execute(text("INSERT INTO domain_ltd_list (name) VALUES (:n)"), {"n": name})
db.commit()
except Exception:
db.rollback()
# ─── 5. Contacts + Teams (filtre périmètre IT uniquement) ───
persons = client.get_all("Person", "name,first_name,email,phone,org_name,function,status")
# Périmètre IT : teams à synchroniser (configurable via settings)
from .secrets_service import get_secret as _gs
it_teams_raw = _gs(db, "itop_contact_teams")
it_teams_list = ["SecOps", "iPOP", "Externe", "DSI", "Admin DSI"]
if it_teams_raw:
it_teams_list = [t.strip() for t in it_teams_raw.split(",") if t.strip()]
# Map person → (itop_id, team)
person_info = {} # fullname_lower -> {itop_id, team}
team_members = {} # pour responsables domaine×env plus bas
teams = client.get_all("Team", "name,persons_list")
for t in teams:
team_name = t.get("name", "")
for member in t.get("persons_list", []):
pname = member.get("person_id_friendlyname", "").lower()
if pname:
team_members[pname] = team_name
team_role_map = {"SecOps": "referent_technique", "iPOP": "responsable_applicatif",
"Externe": "referent_technique", "DSI": "referent_technique",
"Admin DSI": "referent_technique"}
# Set des itop_id et emails vus dans le périmètre IT (pour désactiver les autres)
seen_itop_ids = set()
seen_emails = set()
stats["contacts_deactivated"] = 0
for p in persons:
fullname = f"{p.get('first_name','')} {p.get('name','')}".strip()
email = p.get("email", "")
if not email:
continue
team = team_members.get(fullname.lower(), "")
# Filtre : ne synchroniser que les persons dans le périmètre IT
if team not in it_teams_list:
continue
role = team_role_map.get(team, "referent_technique")
itop_id = p.get("itop_id")
phone = p.get("phone", "")
function = p.get("function", "")
# Status iTop : active/inactive
itop_status = (p.get("status", "") or "").lower()
is_active = itop_status != "inactive"
if itop_id:
seen_itop_ids.add(int(itop_id))
seen_emails.add(email.lower())
existing = db.execute(text("SELECT id FROM contacts WHERE LOWER(email)=LOWER(:e)"), {"e": email}).fetchone()
if existing:
db.execute(text("""UPDATE contacts SET name=:n, role=:r, itop_id=:iid,
telephone=:tel, team=:t, function=:f, is_active=:a, updated_at=NOW() WHERE id=:id"""),
{"id": existing.id, "n": fullname, "r": role, "iid": itop_id,
"tel": phone, "t": team, "f": function, "a": is_active})
else:
try:
db.execute(text("""INSERT INTO contacts (name, email, role, itop_id,
telephone, team, function, is_active) VALUES (:n, :e, :r, :iid, :tel, :t, :f, :a)"""),
{"n": fullname, "e": email, "r": role, "iid": itop_id,
"tel": phone, "t": team, "f": function, "a": is_active})
stats["contacts"] += 1
except Exception:
db.rollback()
# Désactiver les contacts iTop qui ne sont plus dans le périmètre (plus dans les teams IT)
# Critère : a un itop_id mais n'a pas été vu dans le sync
if seen_itop_ids:
placeholders = ",".join(str(i) for i in seen_itop_ids)
r = db.execute(text(f"""UPDATE contacts SET is_active=false, updated_at=NOW()
WHERE itop_id IS NOT NULL AND itop_id NOT IN ({placeholders}) AND is_active=true"""))
stats["contacts_deactivated"] = r.rowcount
# Désactiver les users PatchCenter liés à des contacts devenus inactifs
stats["users_deactivated"] = 0
r = db.execute(text("""UPDATE users SET is_active=false, updated_at=NOW()
WHERE is_active=true AND itop_person_id IN
(SELECT itop_id FROM contacts WHERE is_active=false AND itop_id IS NOT NULL)"""))
stats["users_deactivated"] = r.rowcount
# ─── 6. Build lookup maps ───
domain_map = {r.name.lower(): r.id for r in db.execute(text("SELECT id, name FROM domains")).fetchall()}
env_map = {r.name.lower(): r.id for r in db.execute(text("SELECT id, name FROM environments")).fetchall()}
zone_map = {r.name.lower(): r.id for r in db.execute(text("SELECT id, name FROM zones")).fetchall()}
# Person name → email lookup
person_email = {}
for p in persons:
fullname = f"{p.get('first_name','')} {p.get('name','')}".strip()
person_email[fullname.lower()] = p.get("email", "")
# ─── 7. Pre-collect responsables per domaine×env (most frequent wins) ───
# Will be populated during VM processing, then used to update domain_environments
de_responsables = defaultdict(lambda: {"resp_dom": defaultdict(int), "resp_dom_email": {},
"referent": defaultdict(int), "referent_email": {}})
# ─── 7bis. ApplicationSolutions ───
crit_map = {"high": "haute", "critical": "critique", "medium": "standard", "low": "basse"}
itop_apps = client.get_all("ApplicationSolution", "name,description,business_criticity,status")
for app in itop_apps:
iid = app.get("itop_id")
name = (app.get("name") or "")[:50]
full = (app.get("name") or "")[:200]
desc = (app.get("description") or "")[:500]
crit = crit_map.get((app.get("business_criticity") or "").lower(), "basse")
st = (app.get("status") or "active")[:30]
try:
db.execute(text("""INSERT INTO applications (itop_id, nom_court, nom_complet, description, criticite, status)
VALUES (:iid, :n, :nc, :d, :c, :s)
ON CONFLICT (itop_id) DO UPDATE SET nom_court=EXCLUDED.nom_court,
nom_complet=EXCLUDED.nom_complet, description=EXCLUDED.description,
criticite=EXCLUDED.criticite, status=EXCLUDED.status, updated_at=NOW()"""),
{"iid": iid, "n": name, "nc": full, "d": desc, "c": crit, "s": st})
stats["applications"] = stats.get("applications", 0) + 1
except Exception:
db.rollback()
db.commit()
app_by_itop_id = {r.itop_id: r.id for r in db.execute(text(
"SELECT id, itop_id FROM applications WHERE itop_id IS NOT NULL")).fetchall()}
# ─── 8. VirtualMachines ───
vms = client.get_all("VirtualMachine",
"name,description,status,managementip,osfamily_id_friendlyname,"
"osversion_id_friendlyname,organization_name,cpu,ram,"
"responsable_serveur_name,responsable_domaine_name,"
"environnement_name,domaine_applicatif_name,zone_name,"
"contacts_list,virtualhost_name,business_criticity,"
"tier_name,connexion_method_name,ssh_user_name,"
"patch_frequency_name,pref_patch_jour_name,patch_window,"
"patch_excludes,domain_ldap_name,last_patch_date,"
"applicationsolution_list")
# PatchCenter etat = label iTop verbatim (Production, Implémentation, Stock, Obsolète, EOL, prêt, tests, Nouveau, ...)
itop_status = {
"production": "Production", "implementation": "Implémentation",
"stock": "Stock", "obsolete": "Obsolète", "eol": "EOL",
"pret": "prêt", "tests": "tests", "nouveau": "Nouveau",
"casse": "Cassé", "cede": "Cédé", "en_panne": "En panne",
"a_recuperer": "A récupérer", "perdu": "Perdu",
"recycle": "Recyclé", "occasion": "Occasion",
"a_detruire": "A détruire", "vole": "Volé",
}
for v in vms:
hostname = v.get("name", "").split(".")[0].lower()
if not hostname:
continue
# Resolve domain_env_id
dom = v.get("domaine_applicatif_name", "").lower()
env = v.get("environnement_name", "").lower()
de_id = None
if dom in domain_map and env in env_map:
did, eid = domain_map[dom], env_map[env]
row = db.execute(text("SELECT id FROM domain_environments WHERE domain_id=:d AND environment_id=:e"),
{"d": did, "e": eid}).fetchone()
if row:
de_id = row.id
else:
try:
db.execute(text("INSERT INTO domain_environments (domain_id, environment_id) VALUES (:d, :e)"),
{"d": did, "e": eid})
db.commit()
row = db.execute(text("SELECT id FROM domain_environments WHERE domain_id=:d AND environment_id=:e"),
{"d": did, "e": eid}).fetchone()
de_id = row.id if row else None
except Exception:
db.rollback()
# Collect responsables for this domain×env
if de_id:
resp_dom = v.get("responsable_domaine_name", "")
if resp_dom:
de_responsables[de_id]["resp_dom"][resp_dom] += 1
de_responsables[de_id]["resp_dom_email"][resp_dom] = person_email.get(resp_dom.lower(), "")
zone_id = zone_map.get(v.get("zone_name", "").lower())
tier_raw = v.get("tier_name", "")
tier = tier_raw.lower().replace(" ", "") if tier_raw else "a_definir"
ssh_method = v.get("connexion_method_name", "") or "ssh_key"
patch_freq = (v.get("patch_frequency_name", "") or "").lower() or None
pref_jour = (v.get("pref_patch_jour_name", "") or "").lower() or "indifferent"
pref_heure = v.get("patch_window", "") or "indifferent"
resp_srv_name = v.get("responsable_serveur_name", "")
resp_dom_name = v.get("responsable_domaine_name", "")
# ApplicationSolution (première app si plusieurs)
app_id = None
app_name = None
apps_list = v.get("applicationsolution_list") or []
if apps_list:
first = apps_list[0]
try:
itop_aid = int(first.get("applicationsolution_id", 0))
app_id = app_by_itop_id.get(itop_aid)
app_name = first.get("applicationsolution_name", "")
except (ValueError, TypeError):
pass
vals = {
"hostname": hostname, "fqdn": v.get("name", hostname),
"os_family": "linux" if "linux" in v.get("osfamily_id_friendlyname", "").lower() else "windows",
"os_version": v.get("osversion_id_friendlyname", ""),
"machine_type": "vm",
"etat": itop_status.get(v.get("status", ""), "Production"),
"de_id": de_id, "zone_id": zone_id,
"resp_srv": resp_srv_name,
"resp_srv_email": person_email.get(resp_srv_name.lower(), ""),
"resp_dom": resp_dom_name,
"resp_dom_email": person_email.get(resp_dom_name.lower(), ""),
"desc": v.get("description", ""),
"ip": v.get("managementip", ""),
"tier": tier, "ssh_method": ssh_method,
"ssh_user": v.get("ssh_user_name", "") or "root",
"patch_freq": patch_freq, "patch_excludes": v.get("patch_excludes", ""),
"domain_ltd": v.get("domain_ldap_name", ""),
"pref_jour": pref_jour, "pref_heure": pref_heure,
"app_id": app_id, "app_name": app_name,
}
existing = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname)=LOWER(:h)"), {"h": hostname}).fetchone()
if existing:
db.execute(text("""UPDATE servers SET fqdn=:fqdn, os_family=:os_family, os_version=:os_version,
etat=:etat, domain_env_id=:de_id, zone_id=:zone_id,
responsable_nom=:resp_srv, responsable_email=:resp_srv_email,
referent_nom=:resp_dom, referent_email=:resp_dom_email, commentaire=:desc,
tier=:tier, ssh_method=:ssh_method, ssh_user=:ssh_user,
patch_frequency=:patch_freq, patch_excludes=:patch_excludes,
domain_ltd=:domain_ltd, pref_patch_jour=:pref_jour, pref_patch_heure=:pref_heure,
application_id=:app_id, application_name=:app_name,
updated_at=NOW() WHERE id=:sid"""), {**vals, "sid": existing.id})
if vals["ip"]:
_upsert_ip(db, existing.id, vals["ip"])
stats["ips"] += 1
stats["servers_updated"] += 1
else:
try:
db.execute(text("""INSERT INTO servers (hostname, fqdn, os_family, os_version, machine_type,
etat, domain_env_id, zone_id, responsable_nom, responsable_email,
referent_nom, referent_email, commentaire,
ssh_port, ssh_user, ssh_method, tier, patch_frequency, patch_excludes,
domain_ltd, pref_patch_jour, pref_patch_heure)
VALUES (:hostname, :fqdn, :os_family, :os_version, :machine_type,
:etat, :de_id, :zone_id, :resp_srv, :resp_srv_email,
:resp_dom, :resp_dom_email, :desc,
22, :ssh_user, :ssh_method, :tier, :patch_freq, :patch_excludes,
:domain_ltd, :pref_jour, :pref_heure)"""), vals)
db.flush()
if vals["ip"]:
new_srv = db.execute(text("SELECT id FROM servers WHERE hostname=:h"), {"h": hostname}).fetchone()
if new_srv:
_upsert_ip(db, new_srv.id, vals["ip"])
stats["ips"] += 1
stats["servers_created"] += 1
except Exception as e:
db.rollback()
stats["errors"].append(f"VM {hostname}: {str(e)[:80]}")
# ─── 8b. Update domain_environments with most frequent responsables ───
for de_id, resps in de_responsables.items():
updates = {}
if resps["resp_dom"]:
top_resp = max(resps["resp_dom"], key=resps["resp_dom"].get)
updates["responsable_nom"] = top_resp
updates["responsable_email"] = resps["resp_dom_email"].get(top_resp, "")
db.execute(text("""UPDATE domain_environments SET
responsable_nom=:responsable_nom, responsable_email=:responsable_email
WHERE id=:id"""),
{"id": de_id, **updates}) if updates else None
# ─── 9. Physical Servers ───
phys = client.get_all("Server",
"name,description,status,managementip,osfamily_id_friendlyname,"
"osversion_id_friendlyname,contacts_list,location_name")
for s in phys:
hostname = s.get("name", "").split(".")[0].lower()
if not hostname:
continue
contacts = s.get("contacts_list", [])
resp = contacts[0].get("contact_id_friendlyname", "") if contacts else ""
ip = s.get("managementip", "")
osf = "linux" if "linux" in s.get("osfamily_id_friendlyname", "").lower() else "windows"
osv = s.get("osversion_id_friendlyname", "")
existing = db.execute(text("SELECT id FROM servers WHERE LOWER(hostname)=LOWER(:h)"), {"h": hostname}).fetchone()
if existing:
db.execute(text("""UPDATE servers SET fqdn=:f, os_family=:osf, os_version=:osv,
responsable_nom=:resp, commentaire=:desc, site=:site, updated_at=NOW()
WHERE id=:sid"""),
{"f": s.get("name", hostname), "osf": osf, "osv": osv,
"resp": resp, "desc": s.get("description", ""),
"site": s.get("location_name", ""), "sid": existing.id})
if ip:
_upsert_ip(db, existing.id, ip)
stats["servers_updated"] += 1
else:
try:
db.execute(text("""INSERT INTO servers (hostname, fqdn, os_family, os_version, machine_type,
etat, responsable_nom, commentaire, site, ssh_port, ssh_user, ssh_method, tier)
VALUES (:h, :f, :osf, :osv, 'physical', 'Production', :resp, :desc, :site,
22, 'root', 'ssh_key', 'tier0')"""),
{"h": hostname, "f": s.get("name", hostname), "osf": osf, "osv": osv,
"resp": resp, "desc": s.get("description", ""),
"site": s.get("location_name", "")})
db.flush()
if ip:
new_srv = db.execute(text("SELECT id FROM servers WHERE hostname=:h"), {"h": hostname}).fetchone()
if new_srv:
_upsert_ip(db, new_srv.id, ip)
stats["ips"] += 1
stats["servers_created"] += 1
except Exception as e:
db.rollback()
stats["errors"].append(f"Phys {hostname}: {str(e)[:80]}")
db.commit()
_save_sync_timestamp(db, "from", {k: v for k, v in stats.items() if k != "errors"})
db.commit()
log.info(f"iTop import: {stats}")
return stats
# ══════════════════════════════════════════════════════════
# EXPORT: PatchCenter → iTop
# ══════════════════════════════════════════════════════════
def sync_to_itop(db, itop_url, itop_user, itop_pass):
"""Exporte referentiel + serveurs + champs patching vers iTop"""
client = ITopClient(itop_url, itop_user, itop_pass)
stats = {"ref_created": 0, "servers_updated": 0, "servers_created": 0, "errors": []}
# ─── 1. Sync typologies: create missing in iTop ───
typo_map = [
("environments", "Environnement"),
("domains", "DomaineApplicatif"),
("zones", "Zone"),
]
for pc_table, itop_class in typo_map:
existing_itop = {item.get("name", "").lower() for item in client.get_all(itop_class, "name")}
rows = db.execute(text(f"SELECT name FROM {pc_table} ORDER BY name")).fetchall()
for row in rows:
if row.name.lower() not in existing_itop:
r = client.create(itop_class, {"name": row.name})
if r.get("code") == 0:
stats["ref_created"] += 1
existing_itop.add(row.name.lower())
else:
stats["errors"].append(f"{itop_class} '{row.name}': {r.get('message', '')[:60]}")
# ─── 2. Sync domain_ltd → DomainLdap ───
existing_ldap = {item.get("name", "").lower() for item in client.get_all("DomainLdap", "name")}
rows = db.execute(text("SELECT name FROM domain_ltd_list ORDER BY name")).fetchall()
for row in rows:
if row.name.lower() not in existing_ldap:
r = client.create("DomainLdap", {"name": row.name})
if r.get("code") == 0:
stats["ref_created"] += 1
# ─── 3. Sync servers → VirtualMachine ───
itop_vms = {}
for v in client.get_all("VirtualMachine", "name"):
itop_vms[v["name"].split(".")[0].lower()] = v
# DB (iTop label verbatim) -> iTop API internal code
status_map = {
"Production": "production", "Implémentation": "implementation",
"Stock": "stock", "Obsolète": "obsolete", "EOL": "eol",
"prêt": "pret", "tests": "tests", "Nouveau": "nouveau",
"Cassé": "casse", "Cédé": "cede", "En panne": "en_panne",
"A récupérer": "a_recuperer", "Perdu": "perdu",
"Recyclé": "recycle", "Occasion": "occasion",
"A détruire": "a_detruire", "Volé": "vole",
}
tier_map = {"tier0": "Tier 0", "tier1": "Tier 1", "tier2": "Tier 2", "tier3": "Tier 3"}
# Build OSVersion cache: name.lower() → itop_id
itop_osversions = {}
for ov in client.get_all("OSVersion", "name,osfamily_name"):
itop_osversions[ov["name"].lower()] = ov
# Build OSFamily cache
itop_osfamilies = {}
for of in client.get_all("OSFamily", "name"):
itop_osfamilies[of["name"].lower()] = of["itop_id"]
# Build Person name → itop_id lookup for responsable sync
itop_persons = {}
for p in client.get_all("Person", "name,first_name"):
fullname = f"{p.get('first_name','')} {p.get('name','')}".strip()
itop_persons[fullname.lower()] = p["itop_id"]
rows = db.execute(text("""SELECT s.hostname, s.fqdn, s.os_version, s.os_family, s.etat, s.commentaire, s.tier,
s.ssh_method, s.ssh_user, s.patch_frequency, s.patch_excludes, s.domain_ltd,
s.pref_patch_jour, s.pref_patch_heure, s.responsable_nom, s.referent_nom,
s.application_id,
(SELECT si.ip_address::text FROM server_ips si WHERE si.server_id = s.id AND si.ip_type = 'primary' LIMIT 1) as mgmt_ip,
(SELECT sa.audit_date FROM server_audit sa WHERE sa.server_id = s.id ORDER BY sa.audit_date DESC LIMIT 1) as last_audit_date,
(SELECT a.itop_id FROM applications a WHERE a.id = s.application_id) as app_itop_id
FROM servers s WHERE s.machine_type='vm'""")).fetchall()
for srv in rows:
hostname = (srv.hostname or "").lower()
itop_vm = itop_vms.get(hostname)
fields = {}
if srv.mgmt_ip:
fields["managementip"] = srv.mgmt_ip.split("/")[0]
if srv.etat:
fields["status"] = status_map.get(srv.etat, "production") # iTop API internal code
if srv.commentaire:
fields["description"] = srv.commentaire
if srv.patch_excludes:
fields["patch_excludes"] = srv.patch_excludes
if srv.tier and srv.tier in tier_map:
fields["tier_id"] = f"SELECT Tier WHERE name = '{tier_map[srv.tier]}'"
if srv.ssh_method:
fields["connexion_method_id"] = f"SELECT ConnexionMethod WHERE name = '{srv.ssh_method}'"
if srv.ssh_user:
fields["ssh_user_id"] = f"SELECT SshUser WHERE name = '{srv.ssh_user}'"
if srv.patch_frequency:
freq = srv.patch_frequency.capitalize()
fields["patch_frequency_id"] = f"SELECT PatchFrequency WHERE name = '{freq}'"
if srv.pref_patch_jour and srv.pref_patch_jour != "indifferent":
fields["pref_patch_jour_id"] = f"SELECT PrefPatchJour WHERE name = '{srv.pref_patch_jour.capitalize()}'"
if srv.pref_patch_heure and srv.pref_patch_heure != "indifferent":
fields["patch_window"] = srv.pref_patch_heure
if srv.domain_ltd:
fields["domain_ldap_id"] = f"SELECT DomainLdap WHERE name = '{srv.domain_ltd}'"
# Date dernier audit
if srv.last_audit_date:
fields["last_patch_date"] = str(srv.last_audit_date)[:10]
# ApplicationSolution : pousser si défini (replace la liste)
if srv.app_itop_id:
fields["applicationsolution_list"] = [{"applicationsolution_id": int(srv.app_itop_id)}]
# OS version — chercher/créer dans iTop
if srv.os_version:
osv_name = _normalize_os_for_itop(srv.os_version)
osf_name = "Linux" if srv.os_family == "linux" else "Windows" if srv.os_family == "windows" else "Linux"
match = itop_osversions.get(osv_name.lower())
if match:
fields["osversion_id"] = match["itop_id"]
else:
# Créer l'OSVersion dans iTop
osf_id = itop_osfamilies.get(osf_name.lower())
if osf_id:
cr = client.create("OSVersion", {"name": osv_name, "osfamily_id": osf_id})
if cr.get("code") == 0 and cr.get("objects"):
new_id = list(cr["objects"].values())[0]["key"]
fields["osversion_id"] = new_id
itop_osversions[osv_name.lower()] = {"itop_id": new_id, "name": osv_name}
stats["ref_created"] += 1
# Responsable serveur
if srv.responsable_nom:
pid = itop_persons.get(srv.responsable_nom.lower())
if pid:
fields["responsable_serveur_id"] = pid
# Responsable domaine
if srv.referent_nom:
pid = itop_persons.get(srv.referent_nom.lower())
if pid:
fields["responsable_domaine_id"] = pid
if itop_vm:
if fields:
r = client.update("VirtualMachine", itop_vm["itop_id"], fields)
if r.get("code") == 0:
stats["servers_updated"] += 1
else:
stats["errors"].append(f"Update {hostname}: {r.get('message', '')[:60]}")
else:
fields["name"] = srv.hostname
fields["org_id"] = "SELECT Organization WHERE name = 'MPCZ'"
if not fields.get("status"):
fields["status"] = "production"
r = client.create("VirtualMachine", fields)
if r.get("code") == 0:
stats["servers_created"] += 1
else:
stats["errors"].append(f"Create {hostname}: {r.get('message', '')[:60]}")
db.commit()
_save_sync_timestamp(db, "to", {k: v for k, v in stats.items() if k != "errors"})
db.commit()
log.info(f"iTop export: {stats}")
return stats