"""Service de deploiement Qualys Cloud Agent via SSH""" import os import logging import glob from datetime import datetime log = logging.getLogger(__name__) AGENTS_DIR = "/opt/patchcenter/agents" try: import paramiko PARAMIKO_OK = True except ImportError: PARAMIKO_OK = False def list_packages(): """Liste les packages disponibles dans /opt/patchcenter/agents/""" packages = {"deb": [], "rpm": []} if not os.path.isdir(AGENTS_DIR): return packages for f in sorted(os.listdir(AGENTS_DIR)): path = os.path.join(AGENTS_DIR, f) size_mb = round(os.path.getsize(path) / 1024 / 1024, 1) if f.endswith(".deb"): packages["deb"].append({"name": f, "path": path, "size": size_mb}) elif f.endswith(".rpm"): packages["rpm"].append({"name": f, "path": path, "size": size_mb}) return packages def _get_ssh_client(hostname, ssh_user, ssh_key_path, ssh_port=22): """Crée un client SSH paramiko""" if not PARAMIKO_OK: return None, "paramiko non disponible" client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: for cls in [paramiko.Ed25519Key, paramiko.RSAKey, paramiko.ECDSAKey]: try: key = cls.from_private_key_file(ssh_key_path) client.connect(hostname, port=ssh_port, username=ssh_user, pkey=key, timeout=15, look_for_keys=False, allow_agent=False) return client, None except Exception: continue return None, f"Impossible de charger la cle {ssh_key_path}" except Exception as e: return None, str(e) def _run_cmd(client, cmd, sudo=False, timeout=120): """Execute une commande SSH""" if sudo: _, stdout_id, _ = client.exec_command("id -u", timeout=5) uid = stdout_id.read().decode().strip() if uid != "0": cmd = f"sudo {cmd}" _, stdout, stderr = client.exec_command(cmd, timeout=timeout) exit_code = stdout.channel.recv_exit_status() out = stdout.read().decode("utf-8", errors="replace") err = stderr.read().decode("utf-8", errors="replace") return exit_code, out, err def check_agent(hostname, ssh_user, ssh_key_path, ssh_port=22): """Vérifie le statut de l'agent Qualys sur un serveur""" client, error = _get_ssh_client(hostname, ssh_user, ssh_key_path, ssh_port) if not client: return {"hostname": hostname, "status": "CONNECTION_FAILED", "detail": error} result = {"hostname": hostname} # Check if installed code, out, _ = _run_cmd(client, "which qualys-cloud-agent 2>/dev/null || rpm -q qualys-cloud-agent 2>/dev/null || dpkg -l qualys-cloud-agent 2>/dev/null | grep '^ii'") if code != 0 and not out.strip(): result["status"] = "NOT_INSTALLED" result["detail"] = "Agent non installe" client.close() return result # Check if running code, out, _ = _run_cmd(client, "systemctl is-active qualys-cloud-agent 2>/dev/null") status = out.strip() result["service_status"] = status # Get version code, out, _ = _run_cmd(client, "qualys-cloud-agent --version 2>/dev/null || cat /etc/qualys/cloud-agent/qualys-cloud-agent.conf 2>/dev/null | grep -i version | head -1") result["version"] = out.strip()[:50] # Get last checkin from log code, out, _ = _run_cmd(client, "tail -5 /var/log/qualys/qualys-cloud-agent.log 2>/dev/null | grep 'HTTP response code: 200' | tail -1 | awk '{print $1, $2}'") result["last_checkin"] = out.strip()[:25] if status == "active": result["status"] = "ACTIVE" result["detail"] = "Agent actif" elif status == "inactive" or status == "dead": result["status"] = "INACTIVE" result["detail"] = "Agent installe mais inactif" else: result["status"] = "UNKNOWN" result["detail"] = f"Statut: {status}" client.close() return result def deploy_agent(hostname, ssh_user, ssh_key_path, ssh_port, os_family, package_path, activation_id, customer_id, server_uri, on_line=None): """Deploie l'agent Qualys sur un serveur""" def emit(msg): if on_line: on_line(msg) log.info(f"[{hostname}] {msg}") emit(f"Connexion SSH {ssh_user}@{hostname}:{ssh_port}...") client, error = _get_ssh_client(hostname, ssh_user, ssh_key_path, ssh_port) if not client: emit(f"ERREUR connexion: {error}") return {"hostname": hostname, "status": "FAILED", "detail": error} result = {"hostname": hostname, "status": "PENDING"} try: # 1. Detect OS if not provided if not os_family: code, out, _ = _run_cmd(client, "cat /etc/os-release 2>/dev/null | head -3") os_family = "linux" # default if "debian" in out.lower() or "ubuntu" in out.lower(): os_family = "debian" elif "red hat" in out.lower() or "centos" in out.lower() or "rocky" in out.lower(): os_family = "rhel" emit(f"OS detecte: {os_family}") # 2. Check if already installed code, out, _ = _run_cmd(client, "systemctl is-active qualys-cloud-agent 2>/dev/null") if out.strip() == "active": emit("Agent deja installe et actif - skip") result["status"] = "ALREADY_INSTALLED" result["detail"] = "Agent deja actif" client.close() return result # 3. Copy package pkg_name = os.path.basename(package_path) remote_path = f"/tmp/{pkg_name}" emit(f"Copie {pkg_name} ({os.path.getsize(package_path)//1024//1024} Mo)...") sftp = client.open_sftp() sftp.put(package_path, remote_path) sftp.close() emit("Copie terminee") # 4. Install is_deb = pkg_name.endswith(".deb") if is_deb: emit("Installation (dpkg)...") code, out, err = _run_cmd(client, f"dpkg --install {remote_path}", sudo=True, timeout=120) else: emit("Installation (rpm)...") code, out, err = _run_cmd(client, f"rpm -ivh --nosignature {remote_path}", sudo=True, timeout=120) if code != 0 and "already installed" not in (out + err).lower(): emit(f"ERREUR installation (code {code}): {err[:200]}") result["status"] = "INSTALL_FAILED" result["detail"] = err[:200] client.close() return result emit("Installation OK") # 5. Activate emit("Activation de l'agent...") activate_cmd = ( f"/usr/local/qualys/cloud-agent/bin/qualys-cloud-agent.sh " f"ActivationId={activation_id} " f"CustomerId={customer_id} " f"ServerUri={server_uri} " f"ProviderName=NONE" ) code, out, err = _run_cmd(client, activate_cmd, sudo=True, timeout=60) if code != 0: emit(f"ERREUR activation (code {code}): {err[:200]}") result["status"] = "ACTIVATE_FAILED" result["detail"] = err[:200] client.close() return result emit("Activation OK") # 6. Restart service emit("Redemarrage du service...") _run_cmd(client, "systemctl restart qualys-cloud-agent", sudo=True) # 7. Verify code, out, _ = _run_cmd(client, "systemctl is-active qualys-cloud-agent") if out.strip() == "active": emit("Agent deploye et actif !") result["status"] = "SUCCESS" result["detail"] = "Agent deploye avec succes" else: emit(f"Agent installe mais statut: {out.strip()}") result["status"] = "PARTIAL" result["detail"] = f"Installe, service: {out.strip()}" # 8. Cleanup _run_cmd(client, f"rm -f {remote_path}") except Exception as e: emit(f"ERREUR: {e}") result["status"] = "FAILED" result["detail"] = str(e)[:200] client.close() return result