Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions osism/commands/compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
from cliff.command import Command
from loguru import logger

from osism import settings
from osism.utils.ssh import ensure_known_hosts_file, KNOWN_HOSTS_PATH
from osism.utils.ssh import (
build_ssh_command,
ensure_known_hosts_file,
KNOWN_HOSTS_PATH,
)


class Run(Command):
Expand All @@ -33,13 +36,9 @@ def take_action(self, parsed_args):
f"Could not initialize {KNOWN_HOSTS_PATH}, SSH may show warnings"
)

ssh_command = (
remote_command = (
f"docker compose --project-directory=/opt/{environment} {arguments}"
)
ssh_options = f"-o StrictHostKeyChecking=no -o LogLevel=ERROR -o UserKnownHostsFile={KNOWN_HOSTS_PATH}"

# FIXME: use paramiko or something else more Pythonic + make operator user + key configurable
subprocess.call(
f"/usr/bin/ssh -i /ansible/secrets/id_rsa.operator {ssh_options} {settings.OPERATOR_USER}@{host} '{ssh_command}'",
shell=True,
)
subprocess.call(build_ssh_command(host, remote_command=remote_command))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'call' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

219 changes: 21 additions & 198 deletions osism/commands/console.py
Original file line number Diff line number Diff line change
@@ -1,155 +1,25 @@
# SPDX-License-Identifier: Apache-2.0

import json
import shlex
import socket
import subprocess
from typing import Optional

from cliff.command import Command
from loguru import logger
from prompt_toolkit import prompt

from osism import settings, utils
from osism.utils.inventory import get_hosts_from_inventory, get_inventory_path
from osism.utils.ssh import ensure_known_hosts_file, KNOWN_HOSTS_PATH


def resolve_hostname_to_ip(hostname: str) -> Optional[str]:
"""
Attempt to resolve hostname to IPv4 address using DNS.

Args:
hostname: The hostname to resolve

Returns:
IPv4 address string if successful, None if resolution fails
"""
try:
ip_address = socket.gethostbyname(hostname)
logger.debug(f"Resolved hostname {hostname} to {ip_address}")
return ip_address
except socket.gaierror as e:
logger.debug(f"DNS resolution failed for {hostname}: {e}")
return None


def get_primary_ipv4_from_netbox(hostname: str) -> Optional[str]:
"""
Retrieve primary IPv4 address for hostname from Netbox.

Args:
hostname: The hostname to look up in Netbox

Returns:
Primary IPv4 address string if found, None otherwise
"""
if not utils.nb:
logger.debug("Netbox integration not available")
return None

try:
device = utils.nb.dcim.devices.get(name=hostname)
if device and device.primary_ip4:
ip_address = str(device.primary_ip4.address).split("/")[0]
logger.info(f"Found primary IPv4 for {hostname} in Netbox: {ip_address}")
return ip_address
else:
logger.debug(f"No device or primary IPv4 found for {hostname} in Netbox")
return None
except Exception as e:
logger.warning(f"Error querying Netbox for {hostname}: {e}")
return None


def resolve_host_with_fallback(hostname: str) -> str:
"""
Resolve hostname with Netbox fallback.

First attempts DNS resolution. If that fails and Netbox integration is enabled,
attempts to retrieve the primary IPv4 address from Netbox.

Args:
hostname: The hostname to resolve

Returns:
Resolved IP address or original hostname if all resolution attempts fail
"""
# First try DNS resolution
ip_address = resolve_hostname_to_ip(hostname)
if ip_address:
return ip_address

# Fallback to Netbox if DNS resolution failed
logger.info(f"DNS resolution failed for {hostname}, trying Netbox fallback")
netbox_ip = get_primary_ipv4_from_netbox(hostname)
if netbox_ip:
logger.info(f"Using IPv4 address {netbox_ip} from Netbox for {hostname}")
return netbox_ip

# If both methods fail, return original hostname and let SSH handle the error
logger.warning(
f"Could not resolve {hostname} via DNS or Netbox, using original hostname"
)
return hostname


def get_hosts_from_group(group: str) -> list:
"""Resolve an Ansible inventory group to its list of hosts.

Args:
group: The inventory group name to resolve

Returns:
Sorted list of hostnames in the group, or empty list if the
group does not exist or cannot be resolved.
"""
try:
inventory_path = get_inventory_path("/ansible/inventory/hosts.yml")
result = subprocess.check_output(
[
"ansible-inventory",
"-i",
inventory_path,
"--list",
"--limit",
group,
],
stderr=subprocess.DEVNULL,
)
inventory = json.loads(result)
hosts = get_hosts_from_inventory(inventory)
return sorted(hosts)
except Exception:
logger.debug("Could not resolve group %r", group, exc_info=True)
return []


def select_host_from_list(hosts: list) -> Optional[str]:
"""Display a numbered list of hosts and let the user choose one.

Args:
hosts: List of hostnames to choose from

Returns:
The selected hostname, or None if the selection was cancelled.
"""
print(f"\nGroup contains {len(hosts)} hosts:\n")
for i, host in enumerate(hosts, 1):
print(f" {i}) {host}")
print()

while True:
answer = prompt("Select host [1-{}]: ".format(len(hosts)))
if answer.strip().lower() in ("q", "quit", "exit"):
return None
try:
index = int(answer.strip())
if 1 <= index <= len(hosts):
return hosts[index - 1]
except ValueError:
pass
print(f"Please enter a number between 1 and {len(hosts)}, or 'q' to cancel.")
from osism.utils.hosts import ( # noqa: F401
resolve_hostname_to_ip,
get_primary_ipv4_from_netbox,
resolve_host_with_fallback,
get_hosts_from_group,
select_host_from_list,
)
from osism.utils.ssh import (
build_clush_command,
build_ssh_command,
ensure_known_hosts_file,
KNOWN_HOSTS_PATH,
)


class Run(Command):
Expand Down Expand Up @@ -197,29 +67,10 @@ def take_action(self, parsed_args):
type_console = "clush"
host = host[1:]

ssh_options = [
"-o",
"StrictHostKeyChecking=no",
"-o",
"LogLevel=ERROR",
"-o",
f"UserKnownHostsFile={KNOWN_HOSTS_PATH}",
]

if type_console == "ansible":
subprocess.call(["/run-ansible-console.sh", host])
elif type_console == "clush":
# SSH options (IdentityFile, StrictHostKeyChecking, LogLevel)
# are configured in clush.conf, no need to pass them here.
subprocess.call(
[
"/usr/local/bin/clush",
"-l",
settings.OPERATOR_USER,
"-g",
host,
]
)
subprocess.call(build_clush_command(group=host))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'call' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

elif type_console == "ssh":
# Try to resolve as an inventory group
group_hosts = get_hosts_from_group(host)
Expand All @@ -235,15 +86,7 @@ def take_action(self, parsed_args):
# Resolve hostname with Netbox fallback
resolved_host = resolve_host_with_fallback(host)
# FIXME: use paramiko or something else more Pythonic + make operator user + key configurable
subprocess.call(
[
"/usr/bin/ssh",
"-i",
"/ansible/secrets/id_rsa.operator",
*ssh_options,
f"{settings.OPERATOR_USER}@{resolved_host}",
]
)
subprocess.call(build_ssh_command(resolved_host))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'call' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

elif type_console == "container_prompt":
while True:
command = prompt(f"{host[:-1]}>>> ")
Expand All @@ -255,42 +98,22 @@ def take_action(self, parsed_args):
resolved_host = resolve_host_with_fallback(host[:-1])
# FIXME: use paramiko or something else more Pythonic + make operator user + key configurable
subprocess.call(
[
"/usr/bin/ssh",
"-i",
"/ansible/secrets/id_rsa.operator",
*ssh_options,
f"{settings.OPERATOR_USER}@{resolved_host}",
ssh_command,
]
build_ssh_command(resolved_host, remote_command=ssh_command)
)
Comment on lines 100 to 102
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'call' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

elif type_console == "container":
target_containername = host.split("/")[1]
target_host = host.split("/")[0]
target_command = "bash"

ssh_command = f"docker exec -it {shlex.quote(target_containername)} {shlex.quote(target_command)}"
ssh_options = [
"-o",
"RequestTTY=force",
"-o",
"StrictHostKeyChecking=no",
"-o",
"LogLevel=ERROR",
"-o",
f"UserKnownHostsFile={KNOWN_HOSTS_PATH}",
]

# Resolve hostname with Netbox fallback
resolved_target_host = resolve_host_with_fallback(target_host)
# FIXME: use paramiko or something else more Pythonic + make operator user + key configurable
subprocess.call(
[
"/usr/bin/ssh",
"-i",
"/ansible/secrets/id_rsa.operator",
*ssh_options,
f"{settings.OPERATOR_USER}@{resolved_target_host}",
ssh_command,
]
build_ssh_command(
resolved_target_host,
remote_command=ssh_command,
request_tty=True,
)
)
Comment on lines 113 to 119
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'call' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

23 changes: 9 additions & 14 deletions osism/commands/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
from loguru import logger
from prompt_toolkit import prompt

from osism import settings
from osism.utils.ssh import ensure_known_hosts_file, KNOWN_HOSTS_PATH
from osism.utils.ssh import (
build_ssh_command,
ensure_known_hosts_file,
KNOWN_HOSTS_PATH,
)


class Run(Command):
Expand All @@ -33,25 +36,17 @@ def take_action(self, parsed_args):
f"Could not initialize {KNOWN_HOSTS_PATH}, SSH may show warnings"
)

ssh_options = f"-o StrictHostKeyChecking=no -o LogLevel=ERROR -o UserKnownHostsFile={KNOWN_HOSTS_PATH}"

if not command:
while True:
command = prompt(f"{host}>>> ")
if command in ["Exit", "exit", "EXIT"]:
break

ssh_command = f"docker {command}"
remote_command = f"docker {command}"
# FIXME: use paramiko or something else more Pythonic + make operator user + key configurable
subprocess.call(
f"/usr/bin/ssh -i /ansible/secrets/id_rsa.operator {ssh_options} {settings.OPERATOR_USER}@{host} {ssh_command}",
shell=True,
)
subprocess.call(build_ssh_command(host, remote_command=remote_command))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'call' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

else:
ssh_command = f"docker {command}"
remote_command = f"docker {command}"

# FIXME: use paramiko or something else more Pythonic + make operator user + key configurable
subprocess.call(
f"/usr/bin/ssh -i /ansible/secrets/id_rsa.operator {ssh_options} {settings.OPERATOR_USER}@{host} {ssh_command}",
shell=True,
)
subprocess.call(build_ssh_command(host, remote_command=remote_command))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security (python.lang.security.audit.dangerous-subprocess-use-audit): Detected subprocess function 'call' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

Source: opengrep

Loading