"""Logique metier serveurs — requetes SQL separees du router""" from sqlalchemy import text def get_server_full(db, server_id): """Retourne un serveur avec tous ses JOINs""" return db.execute(text(""" SELECT s.*, d.name as domaine, d.code as domaine_code, e.code as env_code, z.name as zone, s.qualys_asset_id as qid FROM servers s LEFT JOIN domain_environments de ON s.domain_env_id = de.id LEFT JOIN domains d ON de.domain_id = d.id LEFT JOIN environments e ON de.environment_id = e.id LEFT JOIN zones z ON s.zone_id = z.id WHERE s.id = :id """), {"id": server_id}).fetchone() def get_server_tags(db, qualys_asset_id): """Retourne les tags Qualys d'un asset""" if not qualys_asset_id: return [] rows = db.execute(text(""" SELECT qt.name FROM qualys_asset_tags qat JOIN qualys_tags qt ON qat.qualys_tag_id = qt.qualys_tag_id WHERE qat.qualys_asset_id = :aid ORDER BY qt.name """), {"aid": qualys_asset_id}).fetchall() return [r.name for r in rows] def get_server_ips(db, server_id): """Retourne les IPs segmentees: reelle, connexion, autres""" rows = db.execute(text(""" SELECT ip_address, ip_type, is_ssh, interface, description FROM server_ips WHERE server_id = :id ORDER BY ip_type, ip_address """), {"id": server_id}).fetchall() ip_reelle = None ip_connexion = None autres_ips = [] for r in rows: ip = str(r.ip_address) is_primary = (r.ip_type == 'primary') is_ssh = r.is_ssh if is_primary and not ip_reelle: ip_reelle = ip if is_ssh and not ip_connexion: ip_connexion = ip if not is_primary and not is_ssh: autres_ips.append({ "ip": ip, "type": r.ip_type, "interface": r.interface or "", "description": r.description or "" }) return {"ip_reelle": ip_reelle, "ip_connexion": ip_connexion, "autres_ips": autres_ips} def update_server_ips(db, server_id, ip_reelle, ip_connexion): """Met a jour IP reelle et IP de connexion""" ip_reelle = (ip_reelle or "").strip() ip_connexion = (ip_connexion or "").strip() # Supprimer les anciennes entrees primary / ssh db.execute(text( "DELETE FROM server_ips WHERE server_id = :sid AND (ip_type = 'primary' OR is_ssh = true)" ), {"sid": server_id}) if not ip_reelle and not ip_connexion: return if ip_reelle == ip_connexion: # Meme IP : une seule row primary + ssh db.execute(text(""" INSERT INTO server_ips (server_id, ip_address, ip_type, is_ssh) VALUES (:sid, :ip, 'primary', true) """), {"sid": server_id, "ip": ip_reelle}) else: if ip_reelle: db.execute(text(""" INSERT INTO server_ips (server_id, ip_address, ip_type, is_ssh) VALUES (:sid, :ip, 'primary', false) """), {"sid": server_id, "ip": ip_reelle}) if ip_connexion: db.execute(text(""" INSERT INTO server_ips (server_id, ip_address, ip_type, is_ssh) VALUES (:sid, :ip, 'secondary', true) """), {"sid": server_id, "ip": ip_connexion}) SORT_COLS = { "hostname": "s.hostname", "env": "s.environnement", "domaine": "d.name", "tier": "s.tier", "etat": "s.etat", "os": "s.os_family", "owner": "s.patch_os_owner", "zone": "z.name", } def list_servers(db, filters, page=1, per_page=50, sort="hostname", sort_dir="asc"): """Liste paginee, filtree et triee des serveurs""" offset = (page - 1) * per_page where = ["1=1"] params = {"limit": per_page, "offset": offset} if filters.get("domain"): where.append("d.code = :domain"); params["domain"] = filters["domain"] if filters.get("env"): if filters["env"] == "__null__": where.append("s.environnement IS NULL OR s.environnement = ''") else: where.append("s.environnement = :env"); params["env"] = filters["env"] if filters.get("tier"): where.append("s.tier = :tier"); params["tier"] = filters["tier"] if filters.get("etat"): if filters["etat"] == "__null__": where.append("s.etat IS NULL") else: where.append("s.etat = :etat"); params["etat"] = filters["etat"] if filters.get("os"): where.append("s.os_family = :os"); params["os"] = filters["os"] if filters.get("zone"): if filters["zone"] == "__null__": where.append("s.zone_id IS NULL") else: where.append("s.zone_id = (SELECT id FROM zones WHERE name=:zone LIMIT 1)") params["zone"] = filters["zone"] if filters.get("owner"): where.append("s.patch_os_owner = :owner"); params["owner"] = filters["owner"] if filters.get("licence"): if filters["licence"] == "__null__": where.append("s.licence_support IS NULL") else: where.append("s.licence_support = :licence"); params["licence"] = filters["licence"] if filters.get("application_id"): where.append("s.application_id = :app_id"); params["app_id"] = filters["application_id"] elif filters.get("application"): # Matche soit application_name exact, soit via jointure application catalogue (nom_court ou nom_complet) where.append("""(s.application_name = :application OR s.application_id IN (SELECT id FROM applications WHERE LOWER(nom_court) = LOWER(:application) OR LOWER(nom_complet) = LOWER(:application)))""") params["application"] = filters["application"] if filters.get("search"): where.append("s.hostname ILIKE :search"); params["search"] = f"%{filters['search']}%" wc = " AND ".join(where) order_col = SORT_COLS.get(sort, "s.hostname") order_dir = "DESC" if sort_dir == "desc" else "ASC" order_clause = f"{order_col} {order_dir}, s.hostname ASC" servers = db.execute(text(f""" SELECT s.id, s.hostname, s.fqdn, d.name as domaine, s.environnement, z.name as zone, s.os_family, s.os_version, s.tier, s.etat, s.licence_support, s.patch_os_owner, s.responsable_nom, s.machine_type, s.application_name, CASE WHEN s.os_version ILIKE '%Red Hat%' THEN 'Red Hat ' || COALESCE((regexp_match(s.os_version, '(\d+\.\d+)'))[1], '') WHEN s.os_version ILIKE '%Oracle%Linux%' THEN 'Oracle ' || COALESCE((regexp_match(s.os_version, '(\d+\.\d+)'))[1], '') WHEN s.os_version ILIKE '%CentOS Stream%' THEN 'CentOS Stream ' || COALESCE((regexp_match(s.os_version, '(\d+[\.\d]*)'))[1], '') WHEN s.os_version ILIKE '%CentOS%' THEN 'CentOS ' || COALESCE((regexp_match(s.os_version, '(\d+[\.\d]*)'))[1], '') WHEN s.os_version ILIKE '%Debian%' THEN 'Debian ' || COALESCE((regexp_match(s.os_version, '(\d+)'))[1], '') WHEN s.os_version ILIKE '%Ubuntu%' THEN 'Ubuntu ' || COALESCE((regexp_match(s.os_version, '(\d+\.\d+)'))[1], '') WHEN s.os_version ILIKE '%Windows Server 2022 Standard%' THEN '2022 Standard' WHEN s.os_version ILIKE '%Windows Server 2022 Datacenter%' THEN '2022 Datacenter' WHEN s.os_version ILIKE '%Windows Server 2019 Standard%' THEN '2019 Standard' WHEN s.os_version ILIKE '%Windows Server 2019 Datacenter%' THEN '2019 Datacenter' WHEN s.os_version ILIKE '%Windows Server 2016 Standard%' THEN '2016 Standard' WHEN s.os_version ILIKE '%Windows Server 2016%' THEN '2016 Standard' WHEN s.os_version ILIKE '%Windows Server 2012%' THEN '2012 R2' WHEN s.os_version ILIKE '%Windows Server 2008%' THEN '2008 R2' WHEN s.os_version ILIKE '%Windows 10 Enterprise%' THEN 'Windows 10 Ent' WHEN s.os_version ILIKE '%Windows%2022%' THEN '2022 Standard' WHEN s.os_version ILIKE '%Windows%2019%' THEN '2019 Standard' WHEN s.os_version ILIKE '%Windows%2016%' THEN '2016 Standard' WHEN s.os_version ILIKE '%Windows%2019%' THEN '2019 Standard' WHEN s.os_version ILIKE '%Windows%2022%' THEN '2022 Standard' ELSE LEFT(s.os_version, 25) END as os_short FROM servers s LEFT JOIN domain_environments de ON s.domain_env_id = de.id LEFT JOIN domains d ON de.domain_id = d.id LEFT JOIN environments e ON de.environment_id = e.id LEFT JOIN zones z ON s.zone_id = z.id WHERE {wc} ORDER BY {order_clause} LIMIT :limit OFFSET :offset """), params).fetchall() total = db.execute(text(f""" SELECT COUNT(*) FROM servers s LEFT JOIN domain_environments de ON s.domain_env_id = de.id LEFT JOIN domains d ON de.domain_id = d.id LEFT JOIN environments e ON de.environment_id = e.id WHERE {wc} """), params).scalar() return servers, total def update_server(db, server_id, data, username): """Met a jour un serveur et log l'action""" # Domain + Env -> domain_env_id if data.get("domain_code") and data.get("env_code"): row = db.execute(text(""" SELECT de.id FROM domain_environments de JOIN domains d ON de.domain_id = d.id JOIN environments e ON de.environment_id = e.id WHERE d.code = :dc AND e.code = :ec """), {"dc": data["domain_code"], "ec": data["env_code"]}).fetchone() if row: db.execute(text("UPDATE servers SET domain_env_id = :deid WHERE id = :id"), {"deid": row.id, "id": server_id}) # Zone if data.get("zone"): zrow = db.execute(text("SELECT id FROM zones WHERE name = :z"), {"z": data["zone"]}).fetchone() if zrow: db.execute(text("UPDATE servers SET zone_id = :zid WHERE id = :id"), {"zid": zrow.id, "id": server_id}) # IPs (reelle + connexion) update_server_ips(db, server_id, data.get("ip_reelle"), data.get("ip_connexion")) # Champs directs updates = [] params = {"id": server_id} direct_fields = ["tier", "etat", "patch_os_owner", "responsable_nom", "referent_nom", "mode_operatoire", "commentaire", "ssh_method", "domain_ltd", "pref_patch_jour", "pref_patch_heure"] changed = [] for field in direct_fields: if data.get(field) is not None: updates.append(f"{field} = :{field}") params[field] = data[field] changed.append(field) if updates: updates.append("updated_at = now()") db.execute(text(f"UPDATE servers SET {', '.join(updates)} WHERE id = :id"), params) # Audit db.execute(text( "INSERT INTO audit_log (username, action, entity_type, entity_id) VALUES (:un, 'EDIT_SERVER', 'server', :sid)" ), {"un": username, "sid": server_id}) db.commit() return changed def get_reference_data(db): """Retourne les listes de reference pour les filtres/formulaires""" domains = db.execute(text("SELECT code, name FROM domains ORDER BY display_order")).fetchall() envs = db.execute(text("SELECT code, name FROM environments ORDER BY display_order")).fetchall() return domains, envs