diff --git a/ocp_resources/resource.py b/ocp_resources/resource.py index bc16c2cf53..23d16808c2 100644 --- a/ocp_resources/resource.py +++ b/ocp_resources/resource.py @@ -48,6 +48,7 @@ ResourceTeardownError, ValidationError, ) +from ocp_resources.utils.client_config import DynamicClientWithKubeconfig, resolve_bearer_token, save_kubeconfig from ocp_resources.utils.constants import ( DEFAULT_CLUSTER_RETRY_EXCEPTIONS, NOT_FOUND_ERROR_EXCEPTION_DICT, @@ -206,6 +207,7 @@ def get_client( verify_ssl: bool | None = None, token: str | None = None, fake: bool = False, + generate_kubeconfig: bool = False, ) -> DynamicClient | FakeDynamicClient: """ Get a kubernetes client. @@ -230,6 +232,7 @@ def get_client( host (str): host for the cluster verify_ssl (bool): whether to verify ssl token (str): Use token to login + generate_kubeconfig (bool): if True, save the kubeconfig to a temporary file and add path to client kubeconfig attribute. Returns: DynamicClient: a kubernetes client. @@ -286,16 +289,32 @@ def get_client( kubernetes.client.Configuration.set_default(default=client_configuration) try: - return kubernetes.dynamic.DynamicClient(client=_client) + _dynamic_client = kubernetes.dynamic.DynamicClient(client=_client) except MaxRetryError: # Ref: https://github.com/kubernetes-client/python/blob/v26.1.0/kubernetes/base/config/incluster_config.py LOGGER.info("Trying to get client via incluster_config") - return kubernetes.dynamic.DynamicClient( + _dynamic_client = kubernetes.dynamic.DynamicClient( client=kubernetes.config.incluster_config.load_incluster_config( client_configuration=client_configuration, try_refresh_token=try_refresh_token ), ) + if generate_kubeconfig: + if config_file: + LOGGER.info(f"`generate_kubeconfig` ignored, kubeconfig already available at {config_file}") + _dynamic_client = DynamicClientWithKubeconfig(client=_dynamic_client.client, kubeconfig=config_file) + else: + _resolved_token = resolve_bearer_token(token=token, client_configuration=client_configuration) + kubeconfig_path = save_kubeconfig( + host=host or client_configuration.host, + token=_resolved_token, + config_dict=config_dict, + verify_ssl=verify_ssl, + ) + _dynamic_client = DynamicClientWithKubeconfig(client=_dynamic_client.client, kubeconfig=kubeconfig_path) + + return _dynamic_client + def sub_resource_level(current_class: Any, owner_class: Any, parent_class: Any) -> str | None: # return the name of the last class in MRO list that is not one of base diff --git a/ocp_resources/utils/client_config.py b/ocp_resources/utils/client_config.py new file mode 100644 index 0000000000..54628b5393 --- /dev/null +++ b/ocp_resources/utils/client_config.py @@ -0,0 +1,97 @@ +import atexit +import os +import tempfile +from typing import Any + +import kubernetes +import yaml +from kubernetes.dynamic import DynamicClient +from simple_logger.logger import get_logger + +LOGGER = get_logger(name=__name__) + + +class DynamicClientWithKubeconfig(DynamicClient): + def __init__(self, client: kubernetes.client.ApiClient, kubeconfig: str) -> None: + super().__init__(client=client) + self.kubeconfig = kubeconfig + + +def resolve_bearer_token( + token: str | None, + client_configuration: "kubernetes.client.Configuration", +) -> str | None: + """Extract bearer token from client configuration if not explicitly provided.""" + if token: + return token + + if client_configuration.api_key: + _bearer = client_configuration.api_key.get("authorization", "") + if _bearer.startswith("Bearer "): + return _bearer.removeprefix("Bearer ") + + return None + + +def save_kubeconfig( + host: str | None = None, + token: str | None = None, + config_dict: dict[str, Any] | None = None, + verify_ssl: bool | None = None, +) -> str: + """ + Save kubeconfig to a temporary file. + + Builds a kubeconfig from the provided parameters and writes it to a + temporary file with 0o600 permissions. + + Args: + host (str): cluster API server URL. + token (str): bearer token for authentication. + config_dict (dict): existing kubeconfig dict to save as-is. + verify_ssl (bool): if False, sets insecure-skip-tls-verify in the saved config. + + Returns: + str: path to the temporary kubeconfig file. + """ + if config_dict is not None: + _config = config_dict + elif host: + cluster_config: dict[str, Any] = {"server": host} + if verify_ssl is False: + cluster_config["insecure-skip-tls-verify"] = True + + user_config: dict[str, str] = {} + if token: + user_config["token"] = token + + _config = { + "apiVersion": "v1", + "kind": "Config", + "clusters": [{"name": "cluster", "cluster": cluster_config}], + "users": [{"name": "user", "user": user_config}], + "contexts": [{"name": "context", "context": {"cluster": "cluster", "user": "user"}}], + "current-context": "context", + } + else: + raise ValueError("Not enough data to build kubeconfig: provide config_dict or host") + + fd = None + tmp_path = None + try: + fd, tmp_path = tempfile.mkstemp(suffix=".kubeconfig") + os.fchmod(fd, 0o600) + with os.fdopen(fd, "w") as f: + fd = None + yaml.safe_dump(_config, f) + atexit.register(lambda p: os.unlink(p) if os.path.exists(p) else None, tmp_path) + LOGGER.info(f"kubeconfig saved to {tmp_path}") + return tmp_path + + except (OSError, yaml.YAMLError): + if fd is not None: + os.close(fd) + if tmp_path is not None and os.path.exists(tmp_path): + os.unlink(tmp_path) + LOGGER.error("Failed to save kubeconfig") + raise diff --git a/tests/test_resource.py b/tests/test_resource.py index d5db38afcd..bd409eca9e 100644 --- a/tests/test_resource.py +++ b/tests/test_resource.py @@ -1,13 +1,16 @@ import os from unittest.mock import patch +import kubernetes import pytest +import yaml from ocp_resources.exceptions import ResourceTeardownError from ocp_resources.namespace import Namespace from ocp_resources.pod import Pod from ocp_resources.resource import NamespacedResourceList, Resource, ResourceList from ocp_resources.secret import Secret +from ocp_resources.utils.client_config import resolve_bearer_token, save_kubeconfig BASE_NAMESPACE_NAME: str = "test-namespace" BASE_POD_NAME: str = "test-pod" @@ -200,3 +203,116 @@ def test_proxy_precedence(self, fake_client): # Verify HTTPS_PROXY takes precedence over HTTP_PROXY assert fake_client.configuration.proxy == https_proxy + + +class TestSaveKubeconfig: + def test_save_kubeconfig_with_host_and_token(self): + host = "https://api.test-cluster.example.com:6443" + token = "sha256~test-token-value" # noqa: S105 + + kubeconfig_path = save_kubeconfig(host=host, token=token, verify_ssl=False) + try: + assert os.path.exists(kubeconfig_path) + assert os.stat(kubeconfig_path).st_mode & 0o777 == 0o600 + + with open(kubeconfig_path) as f: + config = yaml.safe_load(f) + + assert config["clusters"][0]["cluster"]["server"] == host + assert config["clusters"][0]["cluster"]["insecure-skip-tls-verify"] is True + assert config["users"][0]["user"]["token"] == token + assert config["current-context"] == "context" + finally: + os.unlink(kubeconfig_path) + + def test_save_kubeconfig_with_config_dict(self): + config_dict = { + "apiVersion": "v1", + "kind": "Config", + "clusters": [{"name": "my-cluster", "cluster": {"server": "https://my-server:6443"}}], + "users": [{"name": "my-user", "user": {"token": "my-token"}}], + "contexts": [{"name": "my-context", "context": {"cluster": "my-cluster", "user": "my-user"}}], + "current-context": "my-context", + } + + kubeconfig_path = save_kubeconfig(config_dict=config_dict) + try: + with open(kubeconfig_path) as f: + saved_config = yaml.safe_load(f) + + assert saved_config == config_dict + finally: + os.unlink(kubeconfig_path) + + def test_save_kubeconfig_insufficient_data(self): + with pytest.raises(ValueError, match="Not enough data to build kubeconfig"): + save_kubeconfig() + + def test_save_kubeconfig_file_permissions(self): + _test_token = "test-token" # noqa: S105 + + kubeconfig_path = save_kubeconfig(host="https://api.example.com:6443", token=_test_token) + try: + assert os.stat(kubeconfig_path).st_mode & 0o777 == 0o600 + finally: + os.unlink(kubeconfig_path) + + def test_save_kubeconfig_verify_ssl_not_false(self): + _test_token = "test-token" # noqa: S105 + + kubeconfig_path_true = save_kubeconfig(host="https://api.example.com:6443", token=_test_token, verify_ssl=True) + try: + with open(kubeconfig_path_true) as f: + config_true = yaml.safe_load(f) + + assert "insecure-skip-tls-verify" not in config_true["clusters"][0]["cluster"] + finally: + os.unlink(kubeconfig_path_true) + + kubeconfig_path_none = save_kubeconfig(host="https://api.example.com:6443", token=_test_token, verify_ssl=None) + try: + with open(kubeconfig_path_none) as f: + config_none = yaml.safe_load(f) + finally: + os.unlink(kubeconfig_path_none) + + assert "insecure-skip-tls-verify" not in config_none["clusters"][0]["cluster"] + + def testresolve_bearer_token_from_api_key(self): + """Test that resolve_bearer_token extracts token from Bearer api_key.""" + cfg = kubernetes.client.Configuration() + cfg.api_key = {"authorization": "Bearer sha256~oauth-resolved-token"} # noqa: S105 + result = resolve_bearer_token(token=None, client_configuration=cfg) + assert result == "sha256~oauth-resolved-token" + + def testresolve_bearer_token_explicit_takes_precedence(self): + """Test that an explicit token takes precedence over Bearer in api_key.""" + cfg = kubernetes.client.Configuration() + cfg.api_key = {"authorization": "Bearer sha256~oauth-token"} # noqa: S105 + explicit_token = "explicit-token" # noqa: S105 + result = resolve_bearer_token(token=explicit_token, client_configuration=cfg) + assert result == "explicit-token" + + def testresolve_bearer_token_no_bearer_prefix(self): + """Test that api_key without Bearer prefix does not resolve a token.""" + cfg = kubernetes.client.Configuration() + cfg.api_key = {"authorization": "Basic some-basic-auth"} + result = resolve_bearer_token(token=None, client_configuration=cfg) + assert result is None + + def testresolve_bearer_token_empty_api_key(self): + """Test that empty api_key does not resolve a token.""" + cfg = kubernetes.client.Configuration() + cfg.api_key = {} + result = resolve_bearer_token(token=None, client_configuration=cfg) + assert result is None + + def test_save_kubeconfig_write_failure(self): + _test_token = "test-token" # noqa: S105 + + with pytest.raises(OSError, match="Permission denied"): + with patch( + "ocp_resources.utils.client_config.tempfile.mkstemp", + side_effect=OSError("Permission denied"), + ): + save_kubeconfig(host="https://api.example.com:6443", token=_test_token)