Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions ocp_resources/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
ResourceTeardownError,
ValidationError,
)
from ocp_resources.utils.client_config import DynamicClientWithKubeconfig, resolve_bearer_token, save_kubeconfig
from ocp_resources.utils.constants import (
DEFAULT_CLUSTER_RETRY_EXCEPTIONS,
NOT_FOUND_ERROR_EXCEPTION_DICT,
Expand Down Expand Up @@ -206,6 +207,7 @@ def get_client(
verify_ssl: bool | None = None,
token: str | None = None,
fake: bool = False,
generate_kubeconfig: bool = False,
) -> DynamicClient | FakeDynamicClient:
"""
Get a kubernetes client.
Expand All @@ -230,6 +232,7 @@ def get_client(
host (str): host for the cluster
verify_ssl (bool): whether to verify ssl
token (str): Use token to login
generate_kubeconfig (bool): if True, save the kubeconfig to a temporary file and add path to client kubeconfig attribute.
Returns:
DynamicClient: a kubernetes client.
Expand Down Expand Up @@ -286,16 +289,32 @@ def get_client(
kubernetes.client.Configuration.set_default(default=client_configuration)

try:
return kubernetes.dynamic.DynamicClient(client=_client)
_dynamic_client = kubernetes.dynamic.DynamicClient(client=_client)
except MaxRetryError:
# Ref: https://github.com/kubernetes-client/python/blob/v26.1.0/kubernetes/base/config/incluster_config.py
LOGGER.info("Trying to get client via incluster_config")
return kubernetes.dynamic.DynamicClient(
_dynamic_client = kubernetes.dynamic.DynamicClient(
client=kubernetes.config.incluster_config.load_incluster_config(
client_configuration=client_configuration, try_refresh_token=try_refresh_token
),
)

if generate_kubeconfig:
if config_file:
LOGGER.info(f"`generate_kubeconfig` ignored, kubeconfig already available at {config_file}")
_dynamic_client = DynamicClientWithKubeconfig(client=_dynamic_client.client, kubeconfig=config_file)
else:
_resolved_token = resolve_bearer_token(token=token, client_configuration=client_configuration)
kubeconfig_path = save_kubeconfig(
host=host or client_configuration.host,
token=_resolved_token,
config_dict=config_dict,
verify_ssl=verify_ssl,
)
_dynamic_client = DynamicClientWithKubeconfig(client=_dynamic_client.client, kubeconfig=kubeconfig_path)

return _dynamic_client


def sub_resource_level(current_class: Any, owner_class: Any, parent_class: Any) -> str | None:
# return the name of the last class in MRO list that is not one of base
Expand Down
97 changes: 97 additions & 0 deletions ocp_resources/utils/client_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import atexit
import os
import tempfile
from typing import Any

import kubernetes
import yaml
from kubernetes.dynamic import DynamicClient
from simple_logger.logger import get_logger

LOGGER = get_logger(name=__name__)


class DynamicClientWithKubeconfig(DynamicClient):
def __init__(self, client: kubernetes.client.ApiClient, kubeconfig: str) -> None:
super().__init__(client=client)
self.kubeconfig = kubeconfig


def resolve_bearer_token(
token: str | None,
client_configuration: "kubernetes.client.Configuration",
) -> str | None:
"""Extract bearer token from client configuration if not explicitly provided."""
if token:
return token

if client_configuration.api_key:
_bearer = client_configuration.api_key.get("authorization", "")
if _bearer.startswith("Bearer "):
return _bearer.removeprefix("Bearer ")

return None


def save_kubeconfig(
host: str | None = None,
token: str | None = None,
config_dict: dict[str, Any] | None = None,
verify_ssl: bool | None = None,
) -> str:
"""
Save kubeconfig to a temporary file.

Builds a kubeconfig from the provided parameters and writes it to a
temporary file with 0o600 permissions.

Args:
host (str): cluster API server URL.
token (str): bearer token for authentication.
config_dict (dict): existing kubeconfig dict to save as-is.
verify_ssl (bool): if False, sets insecure-skip-tls-verify in the saved config.

Returns:
str: path to the temporary kubeconfig file.
"""
if config_dict is not None:
_config = config_dict
elif host:
cluster_config: dict[str, Any] = {"server": host}
if verify_ssl is False:
cluster_config["insecure-skip-tls-verify"] = True

user_config: dict[str, str] = {}
if token:
user_config["token"] = token

_config = {
"apiVersion": "v1",
"kind": "Config",
"clusters": [{"name": "cluster", "cluster": cluster_config}],
"users": [{"name": "user", "user": user_config}],
"contexts": [{"name": "context", "context": {"cluster": "cluster", "user": "user"}}],
"current-context": "context",
}
else:
raise ValueError("Not enough data to build kubeconfig: provide config_dict or host")

fd = None
tmp_path = None
try:
fd, tmp_path = tempfile.mkstemp(suffix=".kubeconfig")
os.fchmod(fd, 0o600)
with os.fdopen(fd, "w") as f:
fd = None
yaml.safe_dump(_config, f)
atexit.register(lambda p: os.unlink(p) if os.path.exists(p) else None, tmp_path)
LOGGER.info(f"kubeconfig saved to {tmp_path}")
return tmp_path

except (OSError, yaml.YAMLError):
if fd is not None:
os.close(fd)
if tmp_path is not None and os.path.exists(tmp_path):
os.unlink(tmp_path)
LOGGER.error("Failed to save kubeconfig")
raise
116 changes: 116 additions & 0 deletions tests/test_resource.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import os
from unittest.mock import patch

import kubernetes
import pytest
import yaml

from ocp_resources.exceptions import ResourceTeardownError
from ocp_resources.namespace import Namespace
from ocp_resources.pod import Pod
from ocp_resources.resource import NamespacedResourceList, Resource, ResourceList
from ocp_resources.secret import Secret
from ocp_resources.utils.client_config import resolve_bearer_token, save_kubeconfig

BASE_NAMESPACE_NAME: str = "test-namespace"
BASE_POD_NAME: str = "test-pod"
Expand Down Expand Up @@ -200,3 +203,116 @@ def test_proxy_precedence(self, fake_client):

# Verify HTTPS_PROXY takes precedence over HTTP_PROXY
assert fake_client.configuration.proxy == https_proxy


class TestSaveKubeconfig:
def test_save_kubeconfig_with_host_and_token(self):
host = "https://api.test-cluster.example.com:6443"
token = "sha256~test-token-value" # noqa: S105

kubeconfig_path = save_kubeconfig(host=host, token=token, verify_ssl=False)
try:
assert os.path.exists(kubeconfig_path)
assert os.stat(kubeconfig_path).st_mode & 0o777 == 0o600

with open(kubeconfig_path) as f:
config = yaml.safe_load(f)

assert config["clusters"][0]["cluster"]["server"] == host
assert config["clusters"][0]["cluster"]["insecure-skip-tls-verify"] is True
assert config["users"][0]["user"]["token"] == token
assert config["current-context"] == "context"
finally:
os.unlink(kubeconfig_path)

def test_save_kubeconfig_with_config_dict(self):
config_dict = {
"apiVersion": "v1",
"kind": "Config",
"clusters": [{"name": "my-cluster", "cluster": {"server": "https://my-server:6443"}}],
"users": [{"name": "my-user", "user": {"token": "my-token"}}],
"contexts": [{"name": "my-context", "context": {"cluster": "my-cluster", "user": "my-user"}}],
"current-context": "my-context",
}

kubeconfig_path = save_kubeconfig(config_dict=config_dict)
try:
with open(kubeconfig_path) as f:
saved_config = yaml.safe_load(f)

assert saved_config == config_dict
finally:
os.unlink(kubeconfig_path)

def test_save_kubeconfig_insufficient_data(self):
with pytest.raises(ValueError, match="Not enough data to build kubeconfig"):
save_kubeconfig()

def test_save_kubeconfig_file_permissions(self):
_test_token = "test-token" # noqa: S105

kubeconfig_path = save_kubeconfig(host="https://api.example.com:6443", token=_test_token)
try:
assert os.stat(kubeconfig_path).st_mode & 0o777 == 0o600
finally:
os.unlink(kubeconfig_path)

def test_save_kubeconfig_verify_ssl_not_false(self):
_test_token = "test-token" # noqa: S105

kubeconfig_path_true = save_kubeconfig(host="https://api.example.com:6443", token=_test_token, verify_ssl=True)
try:
with open(kubeconfig_path_true) as f:
config_true = yaml.safe_load(f)

assert "insecure-skip-tls-verify" not in config_true["clusters"][0]["cluster"]
finally:
os.unlink(kubeconfig_path_true)

kubeconfig_path_none = save_kubeconfig(host="https://api.example.com:6443", token=_test_token, verify_ssl=None)
try:
with open(kubeconfig_path_none) as f:
config_none = yaml.safe_load(f)
finally:
os.unlink(kubeconfig_path_none)

assert "insecure-skip-tls-verify" not in config_none["clusters"][0]["cluster"]

def testresolve_bearer_token_from_api_key(self):
"""Test that resolve_bearer_token extracts token from Bearer api_key."""
cfg = kubernetes.client.Configuration()
cfg.api_key = {"authorization": "Bearer sha256~oauth-resolved-token"} # noqa: S105
result = resolve_bearer_token(token=None, client_configuration=cfg)
assert result == "sha256~oauth-resolved-token"

def testresolve_bearer_token_explicit_takes_precedence(self):
"""Test that an explicit token takes precedence over Bearer in api_key."""
cfg = kubernetes.client.Configuration()
cfg.api_key = {"authorization": "Bearer sha256~oauth-token"} # noqa: S105
explicit_token = "explicit-token" # noqa: S105
result = resolve_bearer_token(token=explicit_token, client_configuration=cfg)
assert result == "explicit-token"

def testresolve_bearer_token_no_bearer_prefix(self):
"""Test that api_key without Bearer prefix does not resolve a token."""
cfg = kubernetes.client.Configuration()
cfg.api_key = {"authorization": "Basic some-basic-auth"}
result = resolve_bearer_token(token=None, client_configuration=cfg)
assert result is None

def testresolve_bearer_token_empty_api_key(self):
"""Test that empty api_key does not resolve a token."""
cfg = kubernetes.client.Configuration()
cfg.api_key = {}
result = resolve_bearer_token(token=None, client_configuration=cfg)
assert result is None

def test_save_kubeconfig_write_failure(self):
_test_token = "test-token" # noqa: S105

with pytest.raises(OSError, match="Permission denied"):
with patch(
"ocp_resources.utils.client_config.tempfile.mkstemp",
side_effect=OSError("Permission denied"),
):
save_kubeconfig(host="https://api.example.com:6443", token=_test_token)