diff --git a/airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py b/airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py index c4e07622b..3d8fcaa70 100644 --- a/airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py +++ b/airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py @@ -1,5 +1,6 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. +import threading from abc import ABC, ABCMeta, abstractmethod from typing import Any, Iterable @@ -16,12 +17,28 @@ class StreamSlicerMeta(ABCMeta): isinstance(declarative_stream.retriever.stream_slicer,(GlobalSubstreamCursor, PerPartitionWithGlobalCursor)) """ - def __instancecheck__(cls, instance: Any) -> bool: - # Check if it's our wrapper with matching wrapped class - if hasattr(instance, "wrapped_slicer"): - return isinstance(instance.wrapped_slicer, cls) + _checking: threading.local = threading.local() - return super().__instancecheck__(instance) + def __instancecheck__(cls, instance: Any) -> bool: + if not hasattr(cls._checking, "in_progress"): + cls._checking.in_progress = set() + + instance_id = id(instance) + if instance_id in cls._checking.in_progress: + return super().__instancecheck__(instance) + + # Use object.__getattribute__ to bypass any custom __getattr__ that + # could trigger further isinstance() calls and cause recursion. + try: + wrapped = object.__getattribute__(instance, "wrapped_slicer") + except AttributeError: + return super().__instancecheck__(instance) + + cls._checking.in_progress.add(instance_id) + try: + return isinstance(wrapped, cls) + finally: + cls._checking.in_progress.discard(instance_id) class StreamSlicer(ABC, metaclass=StreamSlicerMeta): diff --git a/unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py b/unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py index 9cabae283..dfa17fb34 100644 --- a/unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py +++ b/unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py @@ -2,6 +2,7 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. # from datetime import timedelta +from typing import Any, Iterable from unittest.mock import Mock import pytest @@ -251,3 +252,51 @@ def test_slice_limiting_functionality(): slices = list(wrapped_slicer.stream_slices()) assert len(slices) == 3 assert slices == mock_slicer.stream_slices.return_value[:3] + + +def test_no_recursion_error_with_wrapped_slicer_attribute_on_non_decorator(): + """Verify that isinstance does not cause RecursionError on objects with a wrapped_slicer attribute and custom __getattr__.""" + + class SlicerWithGetattr(StreamSlicer): + def __init__(self, inner: StreamSlicer) -> None: + self.wrapped_slicer = inner + + def stream_slices(self) -> Iterable[StreamSlice]: + return self.wrapped_slicer.stream_slices() + + def __getattr__(self, name: str) -> Any: + return getattr(self.wrapped_slicer, name) + + inner = SinglePartitionRouter(parameters={}) + outer = SlicerWithGetattr(inner) + + # This would raise RecursionError before the fix + assert isinstance(outer, StreamSlicer) + assert isinstance(outer, SlicerWithGetattr) + assert not isinstance(outer, SubstreamPartitionRouter) + + +def test_no_recursion_error_with_nested_decorators(): + """Verify that double-wrapping a slicer does not cause RecursionError.""" + inner = SinglePartitionRouter(parameters={}) + first_wrap = StreamSlicerTestReadDecorator(wrapped_slicer=inner, maximum_number_of_slices=5) + second_wrap = StreamSlicerTestReadDecorator( + wrapped_slicer=first_wrap, maximum_number_of_slices=3 + ) + + assert isinstance(second_wrap, SinglePartitionRouter) + assert isinstance(second_wrap, StreamSlicerTestReadDecorator) + assert isinstance(first_wrap, SinglePartitionRouter) + assert not isinstance(second_wrap, SubstreamPartitionRouter) + + +def test_no_recursion_error_with_self_referencing_wrapped_slicer(): + """Verify that a circular wrapped_slicer reference does not cause RecursionError.""" + mock_slicer = Mock(spec=StreamSlicer) + wrapped = StreamSlicerTestReadDecorator(wrapped_slicer=mock_slicer, maximum_number_of_slices=5) + + # Simulate a pathological cycle: wrapped_slicer points back to itself + object.__setattr__(wrapped, "wrapped_slicer", wrapped) + + # The key assertion: this must not raise RecursionError + assert isinstance(wrapped, StreamSlicerTestReadDecorator)