224 lines
8.4 KiB
Python
224 lines
8.4 KiB
Python
"""Import des membres d'un groupe AD vers la table users + lien avec contacts.
|
|
|
|
3 champs lies :
|
|
1. LDAP/AD (source : groupe AD specifique, ex. CN=secops,...)
|
|
2. contacts (par email : match existant, creation si absent)
|
|
3. users (par username=sAMAccountName, auth_type='ldap')
|
|
+ users.itop_person_id = contacts.itop_id (si contact matche)
|
|
|
|
Par defaut le groupe AD cible est :
|
|
CN=secops,OU=Groupes d administration,OU=Administration,DC=sanef,DC=groupe
|
|
|
|
La config LDAP (serveur, bind DN, bind pass, base DN) est lue depuis app_secrets
|
|
via ldap_service.
|
|
|
|
Usage (doit tourner depuis le poste SANEF car l'AD n'est pas joignable du lab) :
|
|
python tools/import_ldap_group_users.py
|
|
python tools/import_ldap_group_users.py --group "CN=secops,OU=...,DC=sanef,DC=groupe" --dry-run
|
|
"""
|
|
import os
|
|
import sys
|
|
import argparse
|
|
from pathlib import Path
|
|
|
|
from sqlalchemy import create_engine, text
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
DATABASE_URL = (os.getenv("DATABASE_URL_DEMO")
|
|
or os.getenv("DATABASE_URL")
|
|
or "postgresql://patchcenter:PatchCenter2026!@localhost:5432/patchcenter_db")
|
|
|
|
DEFAULT_GROUP_DN = "CN=secops,OU=Groupes d administration,OU=Administration,DC=sanef,DC=groupe"
|
|
|
|
|
|
def get_ldap_config(engine):
|
|
"""Recupere la config LDAP depuis app_secrets (reutilise ldap_service)."""
|
|
from app.services.ldap_service import _get_config
|
|
with engine.connect() as conn:
|
|
return _get_config(conn)
|
|
|
|
|
|
def fetch_group_members(cfg, group_dn):
|
|
"""Retourne liste de dicts {username, name, email, dn}.
|
|
|
|
Strategie : bind service account -> search pour user dont memberOf contient group_dn.
|
|
Plus fiable que de lire group.member (limite 1500 DN par defaut).
|
|
"""
|
|
from ldap3 import Server, Connection, ALL, SUBTREE
|
|
|
|
use_ssl = cfg["server"].startswith("ldaps://")
|
|
server = Server(cfg["server"], get_info=ALL, use_ssl=use_ssl)
|
|
conn = Connection(server, user=cfg["bind_dn"], password=cfg["bind_pwd"],
|
|
auto_bind=True)
|
|
|
|
# Filter LDAP : membre direct du groupe (inclut comptes admin, meme sans mail)
|
|
search_filter = (
|
|
f"(&(objectClass=user)(objectCategory=person)"
|
|
f"(memberOf={group_dn}))"
|
|
)
|
|
conn.search(cfg["base_dn"], search_filter, search_scope=SUBTREE,
|
|
attributes=["sAMAccountName", "displayName", "mail",
|
|
"userPrincipalName", "distinguishedName",
|
|
"userAccountControl"])
|
|
|
|
members = []
|
|
for entry in conn.entries:
|
|
sam = str(entry.sAMAccountName) if entry.sAMAccountName else None
|
|
if not sam:
|
|
print(f" [SKIP] Entry sans sAMAccountName : {entry.entry_dn}")
|
|
continue
|
|
|
|
# Priorite email : mail > userPrincipalName > fallback sam@sanef.com
|
|
email = None
|
|
if entry.mail and str(entry.mail).strip():
|
|
email = str(entry.mail).strip().lower()
|
|
elif entry.userPrincipalName and str(entry.userPrincipalName).strip():
|
|
email = str(entry.userPrincipalName).strip().lower()
|
|
else:
|
|
email = f"{sam.lower()}@sanef.com"
|
|
print(f" [INFO] {sam} sans mail AD, fallback : {email}")
|
|
|
|
# Verifier si compte desactive (pour info seulement)
|
|
uac = entry.userAccountControl.value if entry.userAccountControl else 0
|
|
if isinstance(uac, int) and uac & 0x2:
|
|
print(f" [WARN] {sam} compte AD DESACTIVE (UAC={uac}) — importe quand meme")
|
|
|
|
members.append({
|
|
"username": sam.lower(),
|
|
"display_name": str(entry.displayName) if entry.displayName else sam,
|
|
"email": email,
|
|
"dn": str(entry.entry_dn),
|
|
})
|
|
conn.unbind()
|
|
return members
|
|
|
|
|
|
SQL_FIND_CONTACT = text("""
|
|
SELECT id, itop_id FROM contacts WHERE lower(email) = :email LIMIT 1
|
|
""")
|
|
|
|
SQL_INSERT_CONTACT = text("""
|
|
INSERT INTO contacts (name, email, role, team, ldap_dn,
|
|
is_active, is_verified, created_at, updated_at)
|
|
VALUES (:name, :email, 'contact_technique', 'SecOps', :ldap_dn,
|
|
true, true, now(), now())
|
|
ON CONFLICT (email) DO UPDATE SET
|
|
name = EXCLUDED.name,
|
|
ldap_dn = EXCLUDED.ldap_dn,
|
|
updated_at = now()
|
|
RETURNING id, itop_id
|
|
""")
|
|
|
|
SQL_FIND_USER = text("""
|
|
SELECT id FROM users WHERE username = :username LIMIT 1
|
|
""")
|
|
|
|
SQL_INSERT_USER = text("""
|
|
INSERT INTO users (username, display_name, email, role, auth_type,
|
|
is_active, contact_id, itop_person_id,
|
|
created_at, updated_at)
|
|
VALUES (:username, :display_name, :email, 'operator', 'ldap',
|
|
true, :contact_id, :itop_pid,
|
|
now(), now())
|
|
""")
|
|
|
|
SQL_UPDATE_USER = text("""
|
|
UPDATE users SET
|
|
display_name = :display_name,
|
|
email = :email,
|
|
auth_type = 'ldap',
|
|
-- is_active PRESERVE : jamais reactive un user desactive manuellement
|
|
contact_id = COALESCE(:contact_id, contact_id),
|
|
itop_person_id = COALESCE(:itop_pid, itop_person_id),
|
|
updated_at = now()
|
|
WHERE id = :uid
|
|
""")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--group", default=DEFAULT_GROUP_DN,
|
|
help=f"DN du groupe AD (defaut: {DEFAULT_GROUP_DN})")
|
|
parser.add_argument("--dry-run", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
print(f"[INFO] Groupe cible : {args.group}")
|
|
engine = create_engine(DATABASE_URL)
|
|
print(f"[INFO] DB: {DATABASE_URL.rsplit('@', 1)[-1]}")
|
|
|
|
cfg = get_ldap_config(engine)
|
|
if not cfg["enabled"]:
|
|
print("[ERR] LDAP desactive dans app_secrets (ldap_enabled != true).")
|
|
sys.exit(1)
|
|
if not cfg["server"] or not cfg["base_dn"]:
|
|
print("[ERR] LDAP non configure (server ou base_dn manquant).")
|
|
sys.exit(1)
|
|
print(f"[INFO] LDAP server : {cfg['server']} base_dn : {cfg['base_dn']}")
|
|
|
|
try:
|
|
members = fetch_group_members(cfg, args.group)
|
|
except Exception as e:
|
|
print(f"[ERR] LDAP search failed : {e}")
|
|
sys.exit(2)
|
|
|
|
print(f"[INFO] Membres AD retrouves : {len(members)}")
|
|
for m in members[:5]:
|
|
print(f" {m['username']:20s} {m['email']:40s} {m['display_name']}")
|
|
if len(members) > 5:
|
|
print(f" ... ({len(members) - 5} autres)")
|
|
|
|
if args.dry_run:
|
|
print("[DRY-RUN] Aucun write")
|
|
return
|
|
|
|
inserted_u = updated_u = created_c = linked_itop = linked_contact = 0
|
|
with engine.begin() as conn:
|
|
for m in members:
|
|
# 1. Contact : find or create (stocke ldap_dn pour tracer source AD)
|
|
row = conn.execute(SQL_FIND_CONTACT, {"email": m["email"]}).fetchone()
|
|
if row:
|
|
contact_id, itop_id = row
|
|
# Mettre a jour ldap_dn si contact deja existant sans la trace
|
|
conn.execute(text(
|
|
"UPDATE contacts SET ldap_dn = COALESCE(ldap_dn, :dn) WHERE id = :cid"
|
|
), {"dn": m["dn"], "cid": contact_id})
|
|
else:
|
|
r = conn.execute(SQL_INSERT_CONTACT, {
|
|
"name": m["display_name"],
|
|
"email": m["email"],
|
|
"ldap_dn": m["dn"],
|
|
}).fetchone()
|
|
contact_id, itop_id = r
|
|
created_c += 1
|
|
|
|
# 2. User : upsert + lien contact_id (FK) + itop_person_id (historique)
|
|
u = conn.execute(SQL_FIND_USER, {"username": m["username"]}).fetchone()
|
|
params = {
|
|
"username": m["username"],
|
|
"display_name": m["display_name"],
|
|
"email": m["email"],
|
|
"contact_id": contact_id,
|
|
"itop_pid": itop_id, # None si contact sans itop_id
|
|
}
|
|
if u:
|
|
conn.execute(SQL_UPDATE_USER, {**params, "uid": u[0]})
|
|
updated_u += 1
|
|
else:
|
|
conn.execute(SQL_INSERT_USER, params)
|
|
inserted_u += 1
|
|
linked_contact += 1
|
|
if itop_id:
|
|
linked_itop += 1
|
|
|
|
print(f"[OK] Termine :")
|
|
print(f" users : INSERT {inserted_u} UPDATE {updated_u}")
|
|
print(f" contacts : CREATE {created_c}")
|
|
print(f" links : users.contact_id = {linked_contact}")
|
|
print(f" users.itop_person_id = {linked_itop}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|