-
Notifications
You must be signed in to change notification settings - Fork 3
Centralize SSH/clush command building and extract host resolution uti… #2149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,155 +1,25 @@ | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| import json | ||
| import shlex | ||
| import socket | ||
| import subprocess | ||
| from typing import Optional | ||
|
|
||
| from cliff.command import Command | ||
| from loguru import logger | ||
| from prompt_toolkit import prompt | ||
|
|
||
| from osism import settings, utils | ||
| from osism.utils.inventory import get_hosts_from_inventory, get_inventory_path | ||
| from osism.utils.ssh import ensure_known_hosts_file, KNOWN_HOSTS_PATH | ||
|
|
||
|
|
||
| def resolve_hostname_to_ip(hostname: str) -> Optional[str]: | ||
| """ | ||
| Attempt to resolve hostname to IPv4 address using DNS. | ||
|
|
||
| Args: | ||
| hostname: The hostname to resolve | ||
|
|
||
| Returns: | ||
| IPv4 address string if successful, None if resolution fails | ||
| """ | ||
| try: | ||
| ip_address = socket.gethostbyname(hostname) | ||
| logger.debug(f"Resolved hostname {hostname} to {ip_address}") | ||
| return ip_address | ||
| except socket.gaierror as e: | ||
| logger.debug(f"DNS resolution failed for {hostname}: {e}") | ||
| return None | ||
|
|
||
|
|
||
| def get_primary_ipv4_from_netbox(hostname: str) -> Optional[str]: | ||
| """ | ||
| Retrieve primary IPv4 address for hostname from Netbox. | ||
|
|
||
| Args: | ||
| hostname: The hostname to look up in Netbox | ||
|
|
||
| Returns: | ||
| Primary IPv4 address string if found, None otherwise | ||
| """ | ||
| if not utils.nb: | ||
| logger.debug("Netbox integration not available") | ||
| return None | ||
|
|
||
| try: | ||
| device = utils.nb.dcim.devices.get(name=hostname) | ||
| if device and device.primary_ip4: | ||
| ip_address = str(device.primary_ip4.address).split("/")[0] | ||
| logger.info(f"Found primary IPv4 for {hostname} in Netbox: {ip_address}") | ||
| return ip_address | ||
| else: | ||
| logger.debug(f"No device or primary IPv4 found for {hostname} in Netbox") | ||
| return None | ||
| except Exception as e: | ||
| logger.warning(f"Error querying Netbox for {hostname}: {e}") | ||
| return None | ||
|
|
||
|
|
||
| def resolve_host_with_fallback(hostname: str) -> str: | ||
| """ | ||
| Resolve hostname with Netbox fallback. | ||
|
|
||
| First attempts DNS resolution. If that fails and Netbox integration is enabled, | ||
| attempts to retrieve the primary IPv4 address from Netbox. | ||
|
|
||
| Args: | ||
| hostname: The hostname to resolve | ||
|
|
||
| Returns: | ||
| Resolved IP address or original hostname if all resolution attempts fail | ||
| """ | ||
| # First try DNS resolution | ||
| ip_address = resolve_hostname_to_ip(hostname) | ||
| if ip_address: | ||
| return ip_address | ||
|
|
||
| # Fallback to Netbox if DNS resolution failed | ||
| logger.info(f"DNS resolution failed for {hostname}, trying Netbox fallback") | ||
| netbox_ip = get_primary_ipv4_from_netbox(hostname) | ||
| if netbox_ip: | ||
| logger.info(f"Using IPv4 address {netbox_ip} from Netbox for {hostname}") | ||
| return netbox_ip | ||
|
|
||
| # If both methods fail, return original hostname and let SSH handle the error | ||
| logger.warning( | ||
| f"Could not resolve {hostname} via DNS or Netbox, using original hostname" | ||
| ) | ||
| return hostname | ||
|
|
||
|
|
||
| def get_hosts_from_group(group: str) -> list: | ||
| """Resolve an Ansible inventory group to its list of hosts. | ||
|
|
||
| Args: | ||
| group: The inventory group name to resolve | ||
|
|
||
| Returns: | ||
| Sorted list of hostnames in the group, or empty list if the | ||
| group does not exist or cannot be resolved. | ||
| """ | ||
| try: | ||
| inventory_path = get_inventory_path("/ansible/inventory/hosts.yml") | ||
| result = subprocess.check_output( | ||
| [ | ||
| "ansible-inventory", | ||
| "-i", | ||
| inventory_path, | ||
| "--list", | ||
| "--limit", | ||
| group, | ||
| ], | ||
| stderr=subprocess.DEVNULL, | ||
| ) | ||
| inventory = json.loads(result) | ||
| hosts = get_hosts_from_inventory(inventory) | ||
| return sorted(hosts) | ||
| except Exception: | ||
| logger.debug("Could not resolve group %r", group, exc_info=True) | ||
| return [] | ||
|
|
||
|
|
||
| def select_host_from_list(hosts: list) -> Optional[str]: | ||
| """Display a numbered list of hosts and let the user choose one. | ||
|
|
||
| Args: | ||
| hosts: List of hostnames to choose from | ||
|
|
||
| Returns: | ||
| The selected hostname, or None if the selection was cancelled. | ||
| """ | ||
| print(f"\nGroup contains {len(hosts)} hosts:\n") | ||
| for i, host in enumerate(hosts, 1): | ||
| print(f" {i}) {host}") | ||
| print() | ||
|
|
||
| while True: | ||
| answer = prompt("Select host [1-{}]: ".format(len(hosts))) | ||
| if answer.strip().lower() in ("q", "quit", "exit"): | ||
| return None | ||
| try: | ||
| index = int(answer.strip()) | ||
| if 1 <= index <= len(hosts): | ||
| return hosts[index - 1] | ||
| except ValueError: | ||
| pass | ||
| print(f"Please enter a number between 1 and {len(hosts)}, or 'q' to cancel.") | ||
| from osism.utils.hosts import ( # noqa: F401 | ||
| resolve_hostname_to_ip, | ||
| get_primary_ipv4_from_netbox, | ||
| resolve_host_with_fallback, | ||
| get_hosts_from_group, | ||
| select_host_from_list, | ||
| ) | ||
| from osism.utils.ssh import ( | ||
| build_clush_command, | ||
| build_ssh_command, | ||
| ensure_known_hosts_file, | ||
| KNOWN_HOSTS_PATH, | ||
| ) | ||
|
|
||
|
|
||
| class Run(Command): | ||
|
|
@@ -197,29 +67,10 @@ def take_action(self, parsed_args): | |
| type_console = "clush" | ||
| host = host[1:] | ||
|
|
||
| ssh_options = [ | ||
| "-o", | ||
| "StrictHostKeyChecking=no", | ||
| "-o", | ||
| "LogLevel=ERROR", | ||
| "-o", | ||
| f"UserKnownHostsFile={KNOWN_HOSTS_PATH}", | ||
| ] | ||
|
|
||
| if type_console == "ansible": | ||
| subprocess.call(["/run-ansible-console.sh", host]) | ||
| elif type_console == "clush": | ||
| # SSH options (IdentityFile, StrictHostKeyChecking, LogLevel) | ||
| # are configured in clush.conf, no need to pass them here. | ||
| subprocess.call( | ||
| [ | ||
| "/usr/local/bin/clush", | ||
| "-l", | ||
| settings.OPERATOR_USER, | ||
| "-g", | ||
| host, | ||
| ] | ||
| ) | ||
| subprocess.call(build_clush_command(group=host)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'call' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. Source: opengrep |
||
| elif type_console == "ssh": | ||
| # Try to resolve as an inventory group | ||
| group_hosts = get_hosts_from_group(host) | ||
|
|
@@ -235,15 +86,7 @@ def take_action(self, parsed_args): | |
| # Resolve hostname with Netbox fallback | ||
| resolved_host = resolve_host_with_fallback(host) | ||
| # FIXME: use paramiko or something else more Pythonic + make operator user + key configurable | ||
| subprocess.call( | ||
| [ | ||
| "/usr/bin/ssh", | ||
| "-i", | ||
| "/ansible/secrets/id_rsa.operator", | ||
| *ssh_options, | ||
| f"{settings.OPERATOR_USER}@{resolved_host}", | ||
| ] | ||
| ) | ||
| subprocess.call(build_ssh_command(resolved_host)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'call' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. Source: opengrep |
||
| elif type_console == "container_prompt": | ||
| while True: | ||
| command = prompt(f"{host[:-1]}>>> ") | ||
|
|
@@ -255,42 +98,22 @@ def take_action(self, parsed_args): | |
| resolved_host = resolve_host_with_fallback(host[:-1]) | ||
| # FIXME: use paramiko or something else more Pythonic + make operator user + key configurable | ||
| subprocess.call( | ||
| [ | ||
| "/usr/bin/ssh", | ||
| "-i", | ||
| "/ansible/secrets/id_rsa.operator", | ||
| *ssh_options, | ||
| f"{settings.OPERATOR_USER}@{resolved_host}", | ||
| ssh_command, | ||
| ] | ||
| build_ssh_command(resolved_host, remote_command=ssh_command) | ||
| ) | ||
|
Comment on lines
100
to
102
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'call' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. Source: opengrep |
||
| elif type_console == "container": | ||
| target_containername = host.split("/")[1] | ||
| target_host = host.split("/")[0] | ||
| target_command = "bash" | ||
|
|
||
| ssh_command = f"docker exec -it {shlex.quote(target_containername)} {shlex.quote(target_command)}" | ||
| ssh_options = [ | ||
| "-o", | ||
| "RequestTTY=force", | ||
| "-o", | ||
| "StrictHostKeyChecking=no", | ||
| "-o", | ||
| "LogLevel=ERROR", | ||
| "-o", | ||
| f"UserKnownHostsFile={KNOWN_HOSTS_PATH}", | ||
| ] | ||
|
|
||
| # Resolve hostname with Netbox fallback | ||
| resolved_target_host = resolve_host_with_fallback(target_host) | ||
| # FIXME: use paramiko or something else more Pythonic + make operator user + key configurable | ||
| subprocess.call( | ||
| [ | ||
| "/usr/bin/ssh", | ||
| "-i", | ||
| "/ansible/secrets/id_rsa.operator", | ||
| *ssh_options, | ||
| f"{settings.OPERATOR_USER}@{resolved_target_host}", | ||
| ssh_command, | ||
| ] | ||
| build_ssh_command( | ||
| resolved_target_host, | ||
| remote_command=ssh_command, | ||
| request_tty=True, | ||
| ) | ||
| ) | ||
|
Comment on lines
113
to
119
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'call' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. Source: opengrep |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,8 +7,11 @@ | |
| from loguru import logger | ||
| from prompt_toolkit import prompt | ||
|
|
||
| from osism import settings | ||
| from osism.utils.ssh import ensure_known_hosts_file, KNOWN_HOSTS_PATH | ||
| from osism.utils.ssh import ( | ||
| build_ssh_command, | ||
| ensure_known_hosts_file, | ||
| KNOWN_HOSTS_PATH, | ||
| ) | ||
|
|
||
|
|
||
| class Run(Command): | ||
|
|
@@ -33,25 +36,17 @@ def take_action(self, parsed_args): | |
| f"Could not initialize {KNOWN_HOSTS_PATH}, SSH may show warnings" | ||
| ) | ||
|
|
||
| ssh_options = f"-o StrictHostKeyChecking=no -o LogLevel=ERROR -o UserKnownHostsFile={KNOWN_HOSTS_PATH}" | ||
|
|
||
| if not command: | ||
| while True: | ||
| command = prompt(f"{host}>>> ") | ||
| if command in ["Exit", "exit", "EXIT"]: | ||
| break | ||
|
|
||
| ssh_command = f"docker {command}" | ||
| remote_command = f"docker {command}" | ||
| # FIXME: use paramiko or something else more Pythonic + make operator user + key configurable | ||
| subprocess.call( | ||
| f"/usr/bin/ssh -i /ansible/secrets/id_rsa.operator {ssh_options} {settings.OPERATOR_USER}@{host} {ssh_command}", | ||
| shell=True, | ||
| ) | ||
| subprocess.call(build_ssh_command(host, remote_command=remote_command)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'call' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. Source: opengrep |
||
| else: | ||
| ssh_command = f"docker {command}" | ||
| remote_command = f"docker {command}" | ||
|
|
||
| # FIXME: use paramiko or something else more Pythonic + make operator user + key configurable | ||
| subprocess.call( | ||
| f"/usr/bin/ssh -i /ansible/secrets/id_rsa.operator {ssh_options} {settings.OPERATOR_USER}@{host} {ssh_command}", | ||
| shell=True, | ||
| ) | ||
| subprocess.call(build_ssh_command(host, remote_command=remote_command)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'call' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. Source: opengrep |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'call' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
Source: opengrep