"""Service exécution patch yum : - yum_dryrun() : `sudo -n yum update --assumeno --exclude=...` - yum_update() : `sudo -n yum update -y --exclude=...` Gère la résolution DNS, ouverture SSH (réutilise _resolve/_connect du realtime_audit_service), validation des excludes (anti-injection shell), capture de la sortie + heuristique de résumé du plan yum. """ import logging import re from typing import Dict, Any, List from .realtime_audit_service import _resolve, _connect, PARAMIKO_OK log = logging.getLogger("patchcenter.patch_run") # Timeouts SSH (secondes) TIMEOUT_DRYRUN = 60 TIMEOUT_UPDATE = 1800 # 30 min — yum update peut être long # Whitelist caractères autorisés dans un nom de paquet (anti-injection shell) EXCLUDE_RE = re.compile(r"^[A-Za-z0-9._*\-/+]+$") def _safe_excludes(raw) -> List[str]: """Normalise et filtre la string `effective_excludes` (séparée par espaces). Rejette les patterns suspects pour éviter toute injection.""" if not raw: return [] parts = str(raw).split() return [p for p in parts if EXCLUDE_RE.match(p)] def _build_cmd(mode: str, excludes: List[str]) -> str: """mode = 'dryrun' (--assumeno) ou 'update' (-y).""" flag = "--assumeno" if mode == "dryrun" else "-y" parts = ["sudo", "-n", "yum", "update", flag] for e in excludes: parts.append(f"--exclude={e}") parts.append("2>&1") return " ".join(parts) def _exec(client, cmd: str, timeout: int) -> Dict[str, Any]: try: stdin, stdout, stderr = client.exec_command(cmd, timeout=timeout) out = stdout.read().decode("utf-8", "replace") err = stderr.read().decode("utf-8", "replace") rc = stdout.channel.recv_exit_status() return {"rc": rc, "stdout": out, "stderr": err} except Exception as e: return {"rc": -1, "stdout": "", "stderr": f"exec error: {e}"} def _open_ssh(hostname: str): target = _resolve(hostname) if not target: return None, None, "DNS résolution impossible" if not PARAMIKO_OK: return None, target, "paramiko non disponible côté serveur PatchCenter" client = _connect(target, hostname) if not client: return None, target, "Connexion SSH échouée" return client, target, None def _summary(stdout: str) -> List[str]: """Extrait les lignes-clé du plan yum (compte de paquets, 'Nothing to do'…).""" keys = ( "package(s)", "paquet(s)", "nothing to do", "rien à faire", "transaction summary", "résumé de la transaction", "install ", "installation ", "upgrade ", "mise à niveau", "complete!", "terminé !", "no packages marked for update", "no package", ) summary = [] for ln in stdout.splitlines(): ll = ln.lower().strip() if any(k in ll for k in keys): summary.append(ln.strip()) if len(summary) >= 25: break return summary def yum_dryrun(hostname: str, excludes_raw) -> Dict[str, Any]: """Lance un dry-run (--assumeno). rc=0 → rien à faire, rc=1 → updates dispo (normal).""" excludes = _safe_excludes(excludes_raw) client, target, err = _open_ssh(hostname) if err: return {"ok": False, "detail": err, "target": target, "excludes": excludes} try: cmd = _build_cmd("dryrun", excludes) r = _exec(client, cmd, TIMEOUT_DRYRUN) finally: try: client.close() except Exception: pass # Avec --assumeno : rc=0 si "Nothing to do", rc=1 si plan d'updates dispo ok = r["rc"] in (0, 1) return { "ok": ok, "rc": r["rc"], "cmd": cmd, "target": target, "excludes": excludes, "summary": _summary(r["stdout"]), "stdout_tail": r["stdout"][-3000:], "stderr": r["stderr"][:500] if r["stderr"] else "", } def yum_update(hostname: str, excludes_raw) -> Dict[str, Any]: """Lance le vrai patch (-y).""" excludes = _safe_excludes(excludes_raw) client, target, err = _open_ssh(hostname) if err: return {"ok": False, "detail": err, "target": target, "excludes": excludes} try: cmd = _build_cmd("update", excludes) r = _exec(client, cmd, TIMEOUT_UPDATE) finally: try: client.close() except Exception: pass ok = (r["rc"] == 0) return { "ok": ok, "rc": r["rc"], "cmd": cmd, "target": target, "excludes": excludes, "summary": _summary(r["stdout"]), "stdout_tail": r["stdout"][-5000:], "stderr": r["stderr"][:500] if r["stderr"] else "", }