align_from_ayoub: dedoublonne les domaines case/accent-insensitive

Fusionne 'Flux Libre'/'flux libre', 'Péage'/'peage'/'PeagE' en gardant la
forme propre (avec accents et capitale). Update domain_environments.domain_id
vers le keeper et supprime les doublons.
This commit is contained in:
Pierre & Lumière 2026-04-14 19:32:28 +02:00
parent 991f4dd6dc
commit 2379a2fdc0

View File

@ -82,13 +82,25 @@ def slugify(s, maxlen=10):
return ascii_str[:maxlen] or None return ascii_str[:maxlen] or None
def norm_domain_key(s):
"""Cle de normalisation domaine: lowercase + sans accent + trim."""
if not s:
return ""
nfkd = unicodedata.normalize("NFKD", s.strip())
ascii_str = "".join(c for c in nfkd if not unicodedata.combining(c))
return ascii_str.lower()
def get_or_create_domain(conn, name): def get_or_create_domain(conn, name):
row = conn.execute(text("SELECT id, code FROM domains WHERE name=:n"), # Match case/accent-insensitive pour fusionner "Péage" et "peage"
{"n": name}).fetchone() key = norm_domain_key(name)
if row: existing = conn.execute(text("SELECT id FROM domains")).fetchall()
return row.id for r in existing:
r_name = conn.execute(text("SELECT name FROM domains WHERE id=:i"),
{"i": r.id}).fetchone().name
if norm_domain_key(r_name) == key:
return r.id
code = slugify(name, 10) code = slugify(name, 10)
# Eviter collision de code
suffix = 0 suffix = 0
base_code = code base_code = code
while conn.execute(text("SELECT 1 FROM domains WHERE code=:c"), while conn.execute(text("SELECT 1 FROM domains WHERE code=:c"),
@ -102,6 +114,32 @@ def get_or_create_domain(conn, name):
{"n": name}).fetchone().id {"n": name}).fetchone().id
def merge_domain_duplicates(conn, dry_run=False):
"""Fusionne les doublons 'Flux Libre'/'flux libre', 'Péage'/'peage' etc."""
rows = conn.execute(text("SELECT id, name FROM domains ORDER BY id")).fetchall()
groups = {}
for r in rows:
k = norm_domain_key(r.name)
groups.setdefault(k, []).append((r.id, r.name))
merged = 0
for k, items in groups.items():
if len(items) <= 1:
continue
# Garde le plus "propre" (avec accent/capitale en priorite)
items.sort(key=lambda x: (x[1] == x[1].lower(), x[0]))
keeper_id, keeper_name = items[0]
for dup_id, dup_name in items[1:]:
print(f" [MERGE] domain {dup_name!r} (id={dup_id}) -> {keeper_name!r} (id={keeper_id})")
if not dry_run:
conn.execute(text(
"UPDATE domain_environments SET domain_id=:k WHERE domain_id=:d"
), {"k": keeper_id, "d": dup_id})
# Supprime le doublon (si plus reference)
conn.execute(text("DELETE FROM domains WHERE id=:d"), {"d": dup_id})
merged += 1
return merged
def get_or_create_env(conn, name): def get_or_create_env(conn, name):
row = conn.execute(text("SELECT id FROM environments WHERE name=:n"), row = conn.execute(text("SELECT id FROM environments WHERE name=:n"),
{"n": name}).fetchone() {"n": name}).fetchone()
@ -148,6 +186,14 @@ def main():
conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT") conn = engine.connect().execution_options(isolation_level="AUTOCOMMIT")
# 0. Fusion prealable des doublons de domaines (Flux Libre / flux libre, Peage / Péage)
print("\n[INFO] Fusion doublons domains (case/accent-insensitive)...")
merged = merge_domain_duplicates(conn, dry_run=args.dry_run)
if merged:
print(f"[INFO] {merged} doublons {'(DRY) ' if args.dry_run else ''}fusionnes")
else:
print("[INFO] Pas de doublon detecte")
wb = openpyxl.load_workbook(args.xlsx_path, data_only=True) wb = openpyxl.load_workbook(args.xlsx_path, data_only=True)
if args.sheet not in wb.sheetnames: if args.sheet not in wb.sheetnames:
print(f"[ERR] Sheet '{args.sheet}' introuvable. Sheets: {wb.sheetnames}") print(f"[ERR] Sheet '{args.sheet}' introuvable. Sheets: {wb.sheetnames}")
@ -212,13 +258,18 @@ def main():
de_id = srv.domain_env_id de_id = srv.domain_env_id
if dom_name and env_name: if dom_name and env_name:
if dom_name not in seen_dom: if dom_name not in seen_dom:
existing = conn.execute(text("SELECT id FROM domains WHERE name=:n"), # Match case/accent-insensitive
{"n": dom_name}).fetchone() key = norm_domain_key(dom_name)
if existing: existing_id = None
seen_dom[dom_name] = existing.id for r in conn.execute(text("SELECT id, name FROM domains")).fetchall():
if norm_domain_key(r.name) == key:
existing_id = r.id
break
if existing_id:
seen_dom[dom_name] = existing_id
elif args.dry_run: elif args.dry_run:
would_create_dom.add(dom_name) would_create_dom.add(dom_name)
seen_dom[dom_name] = -1 # sera cree au vrai run seen_dom[dom_name] = -1
else: else:
seen_dom[dom_name] = get_or_create_domain(conn, dom_name) seen_dom[dom_name] = get_or_create_domain(conn, dom_name)
stats["dom_created"] += 1 stats["dom_created"] += 1