Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

import threading
from abc import ABC, ABCMeta, abstractmethod
from typing import Any, Iterable

Expand All @@ -16,12 +17,28 @@ class StreamSlicerMeta(ABCMeta):
isinstance(declarative_stream.retriever.stream_slicer,(GlobalSubstreamCursor, PerPartitionWithGlobalCursor))
"""

def __instancecheck__(cls, instance: Any) -> bool:
# Check if it's our wrapper with matching wrapped class
if hasattr(instance, "wrapped_slicer"):
return isinstance(instance.wrapped_slicer, cls)
_checking: threading.local = threading.local()

return super().__instancecheck__(instance)
def __instancecheck__(cls, instance: Any) -> bool:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a false positive. __instancecheck__ is a method on a metaclass (ABCMeta subclass), so its first parameter is the class being checked against, not an instance — cls is the correct name here. Using self would be misleading. The original code (introduced in #567) also used cls.

if not hasattr(cls._checking, "in_progress"):
cls._checking.in_progress = set()

instance_id = id(instance)
if instance_id in cls._checking.in_progress:
return super().__instancecheck__(instance)

# Use object.__getattribute__ to bypass any custom __getattr__ that
# could trigger further isinstance() calls and cause recursion.
try:
wrapped = object.__getattribute__(instance, "wrapped_slicer")
except AttributeError:
return super().__instancecheck__(instance)

cls._checking.in_progress.add(instance_id)
try:
return isinstance(wrapped, cls)
finally:
cls._checking.in_progress.discard(instance_id)


class StreamSlicer(ABC, metaclass=StreamSlicerMeta):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
from datetime import timedelta
from typing import Any, Iterable
from unittest.mock import Mock

import pytest
Expand Down Expand Up @@ -251,3 +252,51 @@ def test_slice_limiting_functionality():
slices = list(wrapped_slicer.stream_slices())
assert len(slices) == 3
assert slices == mock_slicer.stream_slices.return_value[:3]


def test_no_recursion_error_with_wrapped_slicer_attribute_on_non_decorator():
"""Verify that isinstance does not cause RecursionError on objects with a wrapped_slicer attribute and custom __getattr__."""

class SlicerWithGetattr(StreamSlicer):
def __init__(self, inner: StreamSlicer) -> None:
self.wrapped_slicer = inner

def stream_slices(self) -> Iterable[StreamSlice]:
return self.wrapped_slicer.stream_slices()

def __getattr__(self, name: str) -> Any:
return getattr(self.wrapped_slicer, name)

inner = SinglePartitionRouter(parameters={})
outer = SlicerWithGetattr(inner)

# This would raise RecursionError before the fix
assert isinstance(outer, StreamSlicer)
assert isinstance(outer, SlicerWithGetattr)
assert not isinstance(outer, SubstreamPartitionRouter)


def test_no_recursion_error_with_nested_decorators():
"""Verify that double-wrapping a slicer does not cause RecursionError."""
inner = SinglePartitionRouter(parameters={})
first_wrap = StreamSlicerTestReadDecorator(wrapped_slicer=inner, maximum_number_of_slices=5)
second_wrap = StreamSlicerTestReadDecorator(
wrapped_slicer=first_wrap, maximum_number_of_slices=3
)

assert isinstance(second_wrap, SinglePartitionRouter)
assert isinstance(second_wrap, StreamSlicerTestReadDecorator)
assert isinstance(first_wrap, SinglePartitionRouter)
assert not isinstance(second_wrap, SubstreamPartitionRouter)


def test_no_recursion_error_with_self_referencing_wrapped_slicer():
"""Verify that a circular wrapped_slicer reference does not cause RecursionError."""
mock_slicer = Mock(spec=StreamSlicer)
wrapped = StreamSlicerTestReadDecorator(wrapped_slicer=mock_slicer, maximum_number_of_slices=5)

# Simulate a pathological cycle: wrapped_slicer points back to itself
object.__setattr__(wrapped, "wrapped_slicer", wrapped)

# The key assertion: this must not raise RecursionError
assert isinstance(wrapped, StreamSlicerTestReadDecorator)
Loading