From 1628413f55cd13b6cbdbbe4d882b8b6d4bc7fd80 Mon Sep 17 00:00:00 2001 From: Christian Berendt Date: Tue, 17 Mar 2026 20:06:36 +0100 Subject: [PATCH] Centralize SSH/clush command building and extract host resolution utilities Move host resolution functions (resolve_hostname_to_ip, get_primary_ipv4_from_netbox, resolve_host_with_fallback, get_hosts_from_group, select_host_from_list) from console.py into new osism/utils/hosts.py module. Add build_ssh_command() and build_clush_command() builder functions to osism/utils/ssh.py to eliminate duplicated SSH option boilerplate across console, log, report, compose, and container commands. This also removes shell=True subprocess calls in compose, container, and log commands. AI-assisted: Claude Code Signed-off-by: Christian Berendt --- osism/commands/compose.py | 15 ++- osism/commands/console.py | 219 ++++-------------------------------- osism/commands/container.py | 23 ++-- osism/commands/log.py | 44 ++------ osism/commands/report.py | 111 +++++------------- osism/utils/hosts.py | 150 ++++++++++++++++++++++++ osism/utils/ssh.py | 100 +++++++++++++++- 7 files changed, 328 insertions(+), 334 deletions(-) create mode 100644 osism/utils/hosts.py diff --git a/osism/commands/compose.py b/osism/commands/compose.py index a48fe417..2ca07994 100644 --- a/osism/commands/compose.py +++ b/osism/commands/compose.py @@ -6,8 +6,11 @@ from cliff.command import Command from loguru import logger -from osism import settings -from osism.utils.ssh import ensure_known_hosts_file, KNOWN_HOSTS_PATH +from osism.utils.ssh import ( + build_ssh_command, + ensure_known_hosts_file, + KNOWN_HOSTS_PATH, +) class Run(Command): @@ -33,13 +36,9 @@ def take_action(self, parsed_args): f"Could not initialize {KNOWN_HOSTS_PATH}, SSH may show warnings" ) - ssh_command = ( + remote_command = ( f"docker compose --project-directory=/opt/{environment} {arguments}" ) - ssh_options = f"-o StrictHostKeyChecking=no -o LogLevel=ERROR -o UserKnownHostsFile={KNOWN_HOSTS_PATH}" # FIXME: use paramiko or something else more Pythonic + make operator user + key configurable - subprocess.call( - f"/usr/bin/ssh -i /ansible/secrets/id_rsa.operator {ssh_options} {settings.OPERATOR_USER}@{host} '{ssh_command}'", - shell=True, - ) + subprocess.call(build_ssh_command(host, remote_command=remote_command)) diff --git a/osism/commands/console.py b/osism/commands/console.py index 665579ac..3f42b061 100644 --- a/osism/commands/console.py +++ b/osism/commands/console.py @@ -1,155 +1,25 @@ # SPDX-License-Identifier: Apache-2.0 -import json import shlex -import socket import subprocess -from typing import Optional from cliff.command import Command from loguru import logger from prompt_toolkit import prompt -from osism import settings, utils -from osism.utils.inventory import get_hosts_from_inventory, get_inventory_path -from osism.utils.ssh import ensure_known_hosts_file, KNOWN_HOSTS_PATH - - -def resolve_hostname_to_ip(hostname: str) -> Optional[str]: - """ - Attempt to resolve hostname to IPv4 address using DNS. - - Args: - hostname: The hostname to resolve - - Returns: - IPv4 address string if successful, None if resolution fails - """ - try: - ip_address = socket.gethostbyname(hostname) - logger.debug(f"Resolved hostname {hostname} to {ip_address}") - return ip_address - except socket.gaierror as e: - logger.debug(f"DNS resolution failed for {hostname}: {e}") - return None - - -def get_primary_ipv4_from_netbox(hostname: str) -> Optional[str]: - """ - Retrieve primary IPv4 address for hostname from Netbox. - - Args: - hostname: The hostname to look up in Netbox - - Returns: - Primary IPv4 address string if found, None otherwise - """ - if not utils.nb: - logger.debug("Netbox integration not available") - return None - - try: - device = utils.nb.dcim.devices.get(name=hostname) - if device and device.primary_ip4: - ip_address = str(device.primary_ip4.address).split("/")[0] - logger.info(f"Found primary IPv4 for {hostname} in Netbox: {ip_address}") - return ip_address - else: - logger.debug(f"No device or primary IPv4 found for {hostname} in Netbox") - return None - except Exception as e: - logger.warning(f"Error querying Netbox for {hostname}: {e}") - return None - - -def resolve_host_with_fallback(hostname: str) -> str: - """ - Resolve hostname with Netbox fallback. - - First attempts DNS resolution. If that fails and Netbox integration is enabled, - attempts to retrieve the primary IPv4 address from Netbox. - - Args: - hostname: The hostname to resolve - - Returns: - Resolved IP address or original hostname if all resolution attempts fail - """ - # First try DNS resolution - ip_address = resolve_hostname_to_ip(hostname) - if ip_address: - return ip_address - - # Fallback to Netbox if DNS resolution failed - logger.info(f"DNS resolution failed for {hostname}, trying Netbox fallback") - netbox_ip = get_primary_ipv4_from_netbox(hostname) - if netbox_ip: - logger.info(f"Using IPv4 address {netbox_ip} from Netbox for {hostname}") - return netbox_ip - - # If both methods fail, return original hostname and let SSH handle the error - logger.warning( - f"Could not resolve {hostname} via DNS or Netbox, using original hostname" - ) - return hostname - - -def get_hosts_from_group(group: str) -> list: - """Resolve an Ansible inventory group to its list of hosts. - - Args: - group: The inventory group name to resolve - - Returns: - Sorted list of hostnames in the group, or empty list if the - group does not exist or cannot be resolved. - """ - try: - inventory_path = get_inventory_path("/ansible/inventory/hosts.yml") - result = subprocess.check_output( - [ - "ansible-inventory", - "-i", - inventory_path, - "--list", - "--limit", - group, - ], - stderr=subprocess.DEVNULL, - ) - inventory = json.loads(result) - hosts = get_hosts_from_inventory(inventory) - return sorted(hosts) - except Exception: - logger.debug("Could not resolve group %r", group, exc_info=True) - return [] - - -def select_host_from_list(hosts: list) -> Optional[str]: - """Display a numbered list of hosts and let the user choose one. - - Args: - hosts: List of hostnames to choose from - - Returns: - The selected hostname, or None if the selection was cancelled. - """ - print(f"\nGroup contains {len(hosts)} hosts:\n") - for i, host in enumerate(hosts, 1): - print(f" {i}) {host}") - print() - - while True: - answer = prompt("Select host [1-{}]: ".format(len(hosts))) - if answer.strip().lower() in ("q", "quit", "exit"): - return None - try: - index = int(answer.strip()) - if 1 <= index <= len(hosts): - return hosts[index - 1] - except ValueError: - pass - print(f"Please enter a number between 1 and {len(hosts)}, or 'q' to cancel.") +from osism.utils.hosts import ( # noqa: F401 + resolve_hostname_to_ip, + get_primary_ipv4_from_netbox, + resolve_host_with_fallback, + get_hosts_from_group, + select_host_from_list, +) +from osism.utils.ssh import ( + build_clush_command, + build_ssh_command, + ensure_known_hosts_file, + KNOWN_HOSTS_PATH, +) class Run(Command): @@ -197,29 +67,10 @@ def take_action(self, parsed_args): type_console = "clush" host = host[1:] - ssh_options = [ - "-o", - "StrictHostKeyChecking=no", - "-o", - "LogLevel=ERROR", - "-o", - f"UserKnownHostsFile={KNOWN_HOSTS_PATH}", - ] - if type_console == "ansible": subprocess.call(["/run-ansible-console.sh", host]) elif type_console == "clush": - # SSH options (IdentityFile, StrictHostKeyChecking, LogLevel) - # are configured in clush.conf, no need to pass them here. - subprocess.call( - [ - "/usr/local/bin/clush", - "-l", - settings.OPERATOR_USER, - "-g", - host, - ] - ) + subprocess.call(build_clush_command(group=host)) elif type_console == "ssh": # Try to resolve as an inventory group group_hosts = get_hosts_from_group(host) @@ -235,15 +86,7 @@ def take_action(self, parsed_args): # Resolve hostname with Netbox fallback resolved_host = resolve_host_with_fallback(host) # FIXME: use paramiko or something else more Pythonic + make operator user + key configurable - subprocess.call( - [ - "/usr/bin/ssh", - "-i", - "/ansible/secrets/id_rsa.operator", - *ssh_options, - f"{settings.OPERATOR_USER}@{resolved_host}", - ] - ) + subprocess.call(build_ssh_command(resolved_host)) elif type_console == "container_prompt": while True: command = prompt(f"{host[:-1]}>>> ") @@ -255,14 +98,7 @@ def take_action(self, parsed_args): resolved_host = resolve_host_with_fallback(host[:-1]) # FIXME: use paramiko or something else more Pythonic + make operator user + key configurable subprocess.call( - [ - "/usr/bin/ssh", - "-i", - "/ansible/secrets/id_rsa.operator", - *ssh_options, - f"{settings.OPERATOR_USER}@{resolved_host}", - ssh_command, - ] + build_ssh_command(resolved_host, remote_command=ssh_command) ) elif type_console == "container": target_containername = host.split("/")[1] @@ -270,27 +106,14 @@ def take_action(self, parsed_args): target_command = "bash" ssh_command = f"docker exec -it {shlex.quote(target_containername)} {shlex.quote(target_command)}" - ssh_options = [ - "-o", - "RequestTTY=force", - "-o", - "StrictHostKeyChecking=no", - "-o", - "LogLevel=ERROR", - "-o", - f"UserKnownHostsFile={KNOWN_HOSTS_PATH}", - ] # Resolve hostname with Netbox fallback resolved_target_host = resolve_host_with_fallback(target_host) # FIXME: use paramiko or something else more Pythonic + make operator user + key configurable subprocess.call( - [ - "/usr/bin/ssh", - "-i", - "/ansible/secrets/id_rsa.operator", - *ssh_options, - f"{settings.OPERATOR_USER}@{resolved_target_host}", - ssh_command, - ] + build_ssh_command( + resolved_target_host, + remote_command=ssh_command, + request_tty=True, + ) ) diff --git a/osism/commands/container.py b/osism/commands/container.py index 363ad7c3..cc8c63e6 100644 --- a/osism/commands/container.py +++ b/osism/commands/container.py @@ -7,8 +7,11 @@ from loguru import logger from prompt_toolkit import prompt -from osism import settings -from osism.utils.ssh import ensure_known_hosts_file, KNOWN_HOSTS_PATH +from osism.utils.ssh import ( + build_ssh_command, + ensure_known_hosts_file, + KNOWN_HOSTS_PATH, +) class Run(Command): @@ -33,25 +36,17 @@ def take_action(self, parsed_args): f"Could not initialize {KNOWN_HOSTS_PATH}, SSH may show warnings" ) - ssh_options = f"-o StrictHostKeyChecking=no -o LogLevel=ERROR -o UserKnownHostsFile={KNOWN_HOSTS_PATH}" - if not command: while True: command = prompt(f"{host}>>> ") if command in ["Exit", "exit", "EXIT"]: break - ssh_command = f"docker {command}" + remote_command = f"docker {command}" # FIXME: use paramiko or something else more Pythonic + make operator user + key configurable - subprocess.call( - f"/usr/bin/ssh -i /ansible/secrets/id_rsa.operator {ssh_options} {settings.OPERATOR_USER}@{host} {ssh_command}", - shell=True, - ) + subprocess.call(build_ssh_command(host, remote_command=remote_command)) else: - ssh_command = f"docker {command}" + remote_command = f"docker {command}" # FIXME: use paramiko or something else more Pythonic + make operator user + key configurable - subprocess.call( - f"/usr/bin/ssh -i /ansible/secrets/id_rsa.operator {ssh_options} {settings.OPERATOR_USER}@{host} {ssh_command}", - shell=True, - ) + subprocess.call(build_ssh_command(host, remote_command=remote_command)) diff --git a/osism/commands/log.py b/osism/commands/log.py index 4ee250ec..e0069ca9 100644 --- a/osism/commands/log.py +++ b/osism/commands/log.py @@ -13,8 +13,13 @@ import requests from osism import settings -from osism.commands.console import get_hosts_from_group, resolve_host_with_fallback -from osism.utils.ssh import ensure_known_hosts_file, KNOWN_HOSTS_PATH +from osism.utils.hosts import get_hosts_from_group, resolve_host_with_fallback +from osism.utils.ssh import ( + build_clush_command, + build_ssh_command, + ensure_known_hosts_file, + KNOWN_HOSTS_PATH, +) class Ansible(Command): @@ -62,14 +67,10 @@ def take_action(self, parsed_args): f"Could not initialize {KNOWN_HOSTS_PATH}, SSH may show warnings" ) - ssh_command = f"docker logs {parameters} {container_name}" - ssh_options = f"-o StrictHostKeyChecking=no -o LogLevel=ERROR -o UserKnownHostsFile={KNOWN_HOSTS_PATH}" + remote_command = f"docker logs {parameters} {container_name}" # FIXME: use paramiko or something else more Pythonic + make operator user + key configurable - subprocess.call( - f"/usr/bin/ssh -i /ansible/secrets/id_rsa.operator {ssh_options} {settings.OPERATOR_USER}@{host} {ssh_command}", - shell=True, - ) + subprocess.call(build_ssh_command(host, remote_command=remote_command)) class File(Command): @@ -133,14 +134,7 @@ def take_action(self, parsed_args): # Use clush for multi-node log tailing. # SSH options are configured in clush.conf. rc = subprocess.call( - [ - "/usr/local/bin/clush", - "-l", - settings.OPERATOR_USER, - "-w", - ",".join(group_hosts), - tail_command, - ] + build_clush_command(hosts=group_hosts, remote_command=tail_command) ) if rc != 0: logger.error( @@ -158,25 +152,9 @@ def take_action(self, parsed_args): # Resolve hostname with DNS + Netbox fallback resolved_host = resolve_host_with_fallback(host) - ssh_options = [ - "-o", - "StrictHostKeyChecking=no", - "-o", - "LogLevel=ERROR", - "-o", - f"UserKnownHostsFile={KNOWN_HOSTS_PATH}", - ] - # FIXME: use paramiko or something else more Pythonic + make operator user + key configurable rc = subprocess.call( - [ - "/usr/bin/ssh", - "-i", - "/ansible/secrets/id_rsa.operator", - *ssh_options, - f"{settings.OPERATOR_USER}@{resolved_host}", - tail_command, - ] + build_ssh_command(resolved_host, remote_command=tail_command) ) if rc != 0: logger.error( diff --git a/osism/commands/report.py b/osism/commands/report.py index 67a13cdc..ef2fd99d 100644 --- a/osism/commands/report.py +++ b/osism/commands/report.py @@ -9,10 +9,13 @@ from loguru import logger from tabulate import tabulate -from osism import settings -from osism.commands.console import resolve_host_with_fallback +from osism.utils.hosts import resolve_host_with_fallback from osism.utils.inventory import get_hosts_from_inventory, get_inventory_path -from osism.utils.ssh import ensure_known_hosts_file, KNOWN_HOSTS_PATH +from osism.utils.ssh import ( + build_ssh_command, + ensure_known_hosts_file, + KNOWN_HOSTS_PATH, +) class Memory(Command): @@ -63,19 +66,6 @@ def take_action(self, parsed_args): logger.error("No hosts found in inventory.") return - ssh_base = [ - "/usr/bin/ssh", - "-i", - "/ansible/secrets/id_rsa.operator", - "-o", - "StrictHostKeyChecking=no", - "-o", - "LogLevel=ERROR", - "-o", - f"UserKnownHostsFile={KNOWN_HOSTS_PATH}", - "-o", - "ConnectTimeout=10", - ] dmidecode_command = ( "sudo dmidecode -t memory | grep 'Size:' | grep -v 'No Module'" " | awk '{if($3==\"MB\") s+=$2/1024; else s+=$2} END {print s}'" @@ -91,11 +81,11 @@ def take_action(self, parsed_args): try: memory_result = subprocess.run( - [ - *ssh_base, - f"{settings.OPERATOR_USER}@{resolved_host}", - dmidecode_command, - ], + build_ssh_command( + resolved_host, + remote_command=dmidecode_command, + connect_timeout=10, + ), capture_output=True, text=True, timeout=30, @@ -109,11 +99,11 @@ def take_action(self, parsed_args): continue uuid_result = subprocess.run( - [ - *ssh_base, - f"{settings.OPERATOR_USER}@{resolved_host}", - uuid_command, - ], + build_ssh_command( + resolved_host, + remote_command=uuid_command, + connect_timeout=10, + ), capture_output=True, text=True, timeout=30, @@ -201,19 +191,6 @@ def take_action(self, parsed_args): logger.error("No hosts found in inventory.") return - ssh_base = [ - "/usr/bin/ssh", - "-i", - "/ansible/secrets/id_rsa.operator", - "-o", - "StrictHostKeyChecking=no", - "-o", - "LogLevel=ERROR", - "-o", - f"UserKnownHostsFile={KNOWN_HOSTS_PATH}", - "-o", - "ConnectTimeout=10", - ] lldp_command = "lldpctl -f json" table = [] @@ -224,11 +201,11 @@ def take_action(self, parsed_args): try: lldp_result = subprocess.run( - [ - *ssh_base, - f"{settings.OPERATOR_USER}@{resolved_host}", - lldp_command, - ], + build_ssh_command( + resolved_host, + remote_command=lldp_command, + connect_timeout=10, + ), capture_output=True, text=True, timeout=30, @@ -370,19 +347,6 @@ def take_action(self, parsed_args): logger.error("No hosts found in inventory.") return - ssh_base = [ - "/usr/bin/ssh", - "-i", - "/ansible/secrets/id_rsa.operator", - "-o", - "StrictHostKeyChecking=no", - "-o", - "LogLevel=ERROR", - "-o", - f"UserKnownHostsFile={KNOWN_HOSTS_PATH}", - "-o", - "ConnectTimeout=10", - ] bgp_command = 'sudo vtysh -c "show bgp summary json"' table = [] @@ -393,11 +357,11 @@ def take_action(self, parsed_args): try: bgp_result = subprocess.run( - [ - *ssh_base, - f"{settings.OPERATOR_USER}@{resolved_host}", - bgp_command, - ], + build_ssh_command( + resolved_host, + remote_command=bgp_command, + connect_timeout=10, + ), capture_output=True, text=True, timeout=30, @@ -547,19 +511,6 @@ def take_action(self, parsed_args): logger.error("No hosts found in inventory.") return - ssh_base = [ - "/usr/bin/ssh", - "-i", - "/ansible/secrets/id_rsa.operator", - "-o", - "StrictHostKeyChecking=no", - "-o", - "LogLevel=ERROR", - "-o", - f"UserKnownHostsFile={KNOWN_HOSTS_PATH}", - "-o", - "ConnectTimeout=10", - ] fact_command = "cat /etc/ansible/facts.d/osism.fact" section = parsed_args.type @@ -574,11 +525,11 @@ def take_action(self, parsed_args): try: fact_result = subprocess.run( - [ - *ssh_base, - f"{settings.OPERATOR_USER}@{resolved_host}", - fact_command, - ], + build_ssh_command( + resolved_host, + remote_command=fact_command, + connect_timeout=10, + ), capture_output=True, text=True, timeout=30, diff --git a/osism/utils/hosts.py b/osism/utils/hosts.py new file mode 100644 index 00000000..9ed56087 --- /dev/null +++ b/osism/utils/hosts.py @@ -0,0 +1,150 @@ +# SPDX-License-Identifier: Apache-2.0 + +import json +import socket +import subprocess +from typing import Optional + +from loguru import logger + +from osism import utils +from osism.utils.inventory import get_hosts_from_inventory, get_inventory_path + + +def resolve_hostname_to_ip(hostname: str) -> Optional[str]: + """ + Attempt to resolve hostname to IPv4 address using DNS. + + Args: + hostname: The hostname to resolve + + Returns: + IPv4 address string if successful, None if resolution fails + """ + try: + ip_address = socket.gethostbyname(hostname) + logger.debug(f"Resolved hostname {hostname} to {ip_address}") + return ip_address + except socket.gaierror as e: + logger.debug(f"DNS resolution failed for {hostname}: {e}") + return None + + +def get_primary_ipv4_from_netbox(hostname: str) -> Optional[str]: + """ + Retrieve primary IPv4 address for hostname from Netbox. + + Args: + hostname: The hostname to look up in Netbox + + Returns: + Primary IPv4 address string if found, None otherwise + """ + if not utils.nb: + logger.debug("Netbox integration not available") + return None + + try: + device = utils.nb.dcim.devices.get(name=hostname) + if device and device.primary_ip4: + ip_address = str(device.primary_ip4.address).split("/")[0] + logger.info(f"Found primary IPv4 for {hostname} in Netbox: {ip_address}") + return ip_address + else: + logger.debug(f"No device or primary IPv4 found for {hostname} in Netbox") + return None + except Exception as e: + logger.warning(f"Error querying Netbox for {hostname}: {e}") + return None + + +def resolve_host_with_fallback(hostname: str) -> str: + """ + Resolve hostname with Netbox fallback. + + First attempts DNS resolution. If that fails and Netbox integration is enabled, + attempts to retrieve the primary IPv4 address from Netbox. + + Args: + hostname: The hostname to resolve + + Returns: + Resolved IP address or original hostname if all resolution attempts fail + """ + # First try DNS resolution + ip_address = resolve_hostname_to_ip(hostname) + if ip_address: + return ip_address + + # Fallback to Netbox if DNS resolution failed + logger.info(f"DNS resolution failed for {hostname}, trying Netbox fallback") + netbox_ip = get_primary_ipv4_from_netbox(hostname) + if netbox_ip: + logger.info(f"Using IPv4 address {netbox_ip} from Netbox for {hostname}") + return netbox_ip + + # If both methods fail, return original hostname and let SSH handle the error + logger.warning( + f"Could not resolve {hostname} via DNS or Netbox, using original hostname" + ) + return hostname + + +def get_hosts_from_group(group: str) -> list: + """Resolve an Ansible inventory group to its list of hosts. + + Args: + group: The inventory group name to resolve + + Returns: + Sorted list of hostnames in the group, or empty list if the + group does not exist or cannot be resolved. + """ + try: + inventory_path = get_inventory_path("/ansible/inventory/hosts.yml") + result = subprocess.check_output( + [ + "ansible-inventory", + "-i", + inventory_path, + "--list", + "--limit", + group, + ], + stderr=subprocess.DEVNULL, + ) + inventory = json.loads(result) + hosts = get_hosts_from_inventory(inventory) + return sorted(hosts) + except Exception: + logger.debug("Could not resolve group %r", group, exc_info=True) + return [] + + +def select_host_from_list(hosts: list) -> Optional[str]: + """Display a numbered list of hosts and let the user choose one. + + Args: + hosts: List of hostnames to choose from + + Returns: + The selected hostname, or None if the selection was cancelled. + """ + from prompt_toolkit import prompt as pt_prompt + + print(f"\nGroup contains {len(hosts)} hosts:\n") + for i, host in enumerate(hosts, 1): + print(f" {i}) {host}") + print() + + while True: + answer = pt_prompt("Select host [1-{}]: ".format(len(hosts))) + if answer.strip().lower() in ("q", "quit", "exit"): + return None + try: + index = int(answer.strip()) + if 1 <= index <= len(hosts): + return hosts[index - 1] + except ValueError: + pass + print(f"Please enter a number between 1 and {len(hosts)}, or 'q' to cancel.") diff --git a/osism/utils/ssh.py b/osism/utils/ssh.py index 6070f37e..e1468856 100644 --- a/osism/utils/ssh.py +++ b/osism/utils/ssh.py @@ -3,14 +3,112 @@ import os import subprocess from typing import List, Optional + from loguru import logger -from osism import utils +from osism import settings, utils # Default path for SSH known_hosts file KNOWN_HOSTS_PATH = "/share/known_hosts" +# SSH / clush binary paths and default identity file +SSH_BINARY = "/usr/bin/ssh" +SSH_KEY_PATH = "/ansible/secrets/id_rsa.operator" +CLUSH_BINARY = "/usr/local/bin/clush" + + +def build_ssh_command( + host: str, + remote_command: str = "", + *, + user: str = "", + connect_timeout: int | None = None, + request_tty: bool = False, + identity_file: str = SSH_KEY_PATH, + known_hosts_path: str = KNOWN_HOSTS_PATH, +) -> list[str]: + """Build an SSH command as a list for subprocess. + + Args: + host: Target host (hostname or IP). + remote_command: Optional command to execute on the remote host. + user: SSH login user. Defaults to ``settings.OPERATOR_USER``. + connect_timeout: Optional SSH ConnectTimeout in seconds. + request_tty: If True, add ``-o RequestTTY=force``. + identity_file: Path to the SSH identity (private key) file. + known_hosts_path: Path to the SSH known_hosts file. + + Returns: + Command list suitable for :func:`subprocess.call` / :func:`subprocess.run`. + """ + if not user: + user = settings.OPERATOR_USER + + cmd: list[str] = [ + SSH_BINARY, + "-i", + identity_file, + "-o", + "StrictHostKeyChecking=no", + "-o", + "LogLevel=ERROR", + "-o", + f"UserKnownHostsFile={known_hosts_path}", + ] + + if connect_timeout is not None: + cmd.extend(["-o", f"ConnectTimeout={connect_timeout}"]) + + if request_tty: + cmd.extend(["-o", "RequestTTY=force"]) + + cmd.append(f"{user}@{host}") + + if remote_command: + cmd.append(remote_command) + + return cmd + + +def build_clush_command( + hosts: list[str] | None = None, + group: str | None = None, + remote_command: str = "", + *, + user: str = "", +) -> list[str]: + """Build a clush command as a list for subprocess. + + SSH options (IdentityFile, StrictHostKeyChecking, LogLevel) are + expected to be configured in ``clush.conf``. + + Either *hosts* or *group* must be provided. + + Args: + hosts: Explicit list of target hosts (``-w``). + group: Inventory group name (``-g``). + remote_command: Command to execute on remote hosts. + user: SSH login user. Defaults to ``settings.OPERATOR_USER``. + + Returns: + Command list suitable for :func:`subprocess.call` / :func:`subprocess.run`. + """ + if not user: + user = settings.OPERATOR_USER + + cmd: list[str] = [CLUSH_BINARY, "-l", user] + + if hosts: + cmd.extend(["-w", ",".join(hosts)]) + elif group: + cmd.extend(["-g", group]) + + if remote_command: + cmd.append(remote_command) + + return cmd + def ensure_known_hosts_file(known_hosts_path: str = KNOWN_HOSTS_PATH) -> bool: """