diff --git a/tests/integration/_utils.py b/tests/integration/_utils.py index d46fae06..2d941af7 100644 --- a/tests/integration/_utils.py +++ b/tests/integration/_utils.py @@ -1,7 +1,7 @@ from __future__ import annotations import asyncio -from typing import TYPE_CHECKING, TypeVar +from typing import TYPE_CHECKING, Literal, TypeVar from crawlee._utils.crypto import crypto_random_object_id @@ -13,26 +13,39 @@ T = TypeVar('T') -async def call_with_exp_backoff(fn: Callable[[], Awaitable[T]], *, max_retries: int = 3) -> T | None: +async def call_with_exp_backoff( + fn: Callable[[], Awaitable[T]], + *, + rq_access_mode: Literal['single', 'shared'], + max_retries: int = 3, +) -> T | None: """Call an async callable with exponential backoff retries until it returns a truthy value. In shared request queue mode, there is a propagation delay before newly added, reclaimed, or handled requests become visible in the API (see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with exponential backoff to handle that delay in integration tests. + + When `rq_access_mode` is `'single'`, the function is called once without retries. """ - result = None + if rq_access_mode == 'single': + return await fn() + + if rq_access_mode == 'shared': + result = None + + for attempt in range(max_retries): + result = await fn() - for attempt in range(max_retries): - result = await fn() + if result: + return result - if result: - return result + delay = 2**attempt + Actor.log.info(f'{fn} returned {result!r}, retrying in {delay}s (attempt {attempt + 1}/{max_retries})') + await asyncio.sleep(delay) - delay = 2**attempt - Actor.log.info(f'{fn} returned {result!r}, retrying in {delay}s (attempt {attempt + 1}/{max_retries})') - await asyncio.sleep(delay) + return result - return result + raise ValueError(f'Invalid rq_access_mode: {rq_access_mode}') def generate_unique_resource_name(label: str) -> str: diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index eb395d42..8011d8f1 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -25,6 +25,9 @@ from apify.storage_clients._apify._models import ApifyRequestQueueMetadata +# In shared mode, there is a propagation delay between operations so we use test helper +# `call_with_exp_backoff` for exponential backoff. See https://github.com/apify/apify-sdk-python/issues/808. + async def test_add_and_fetch_requests( request_queue_apify: RequestQueue, @@ -43,7 +46,7 @@ async def test_add_and_fetch_requests( await rq.add_request(f'https://example.com/{i}') handled_request_count = 0 - while next_request := await rq.fetch_next_request(): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): Actor.log.info('Fetching next request...') queue_operation_info = await rq.mark_request_as_handled(next_request) assert queue_operation_info is not None, f'queue_operation_info={queue_operation_info}' @@ -57,10 +60,7 @@ async def test_add_and_fetch_requests( f'desired_request_count={desired_request_count}', ) Actor.log.info('Waiting for queue to be finished...') - if rq_access_mode == 'shared': - is_finished = await call_with_exp_backoff(rq.is_finished) - else: - is_finished = await rq.is_finished() + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) assert is_finished is True, f'is_finished={is_finished}' @@ -81,7 +81,7 @@ async def test_add_requests_in_batches( Actor.log.info(f'Added {desired_request_count} requests in batch, total in queue: {total_count}') handled_request_count = 0 - while next_request := await rq.fetch_next_request(): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): if handled_request_count % 20 == 0: Actor.log.info(f'Processing request {handled_request_count + 1}...') queue_operation_info = await rq.mark_request_as_handled(next_request) @@ -95,10 +95,7 @@ async def test_add_requests_in_batches( f'handled_request_count={handled_request_count}', f'desired_request_count={desired_request_count}', ) - if rq_access_mode == 'shared': - is_finished = await call_with_exp_backoff(rq.is_finished) - else: - is_finished = await rq.is_finished() + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) assert is_finished is True, f'is_finished={is_finished}' @@ -123,7 +120,7 @@ async def test_add_non_unique_requests_in_batch( Actor.log.info(f'Added {desired_request_count} requests with duplicate unique keys, total in queue: {total_count}') handled_request_count = 0 - while next_request := await rq.fetch_next_request(): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): if handled_request_count % 20 == 0: Actor.log.info(f'Processing request {handled_request_count + 1}: {next_request.url}') queue_operation_info = await rq.mark_request_as_handled(next_request) @@ -138,16 +135,17 @@ async def test_add_non_unique_requests_in_batch( f'handled_request_count={handled_request_count}', f'expected_count={expected_count}', ) - if rq_access_mode == 'shared': - is_finished = await call_with_exp_backoff(rq.is_finished) - else: - is_finished = await rq.is_finished() + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) Actor.log.info(f'Processed {handled_request_count}/{expected_count} requests, finished: {is_finished}') assert is_finished is True, f'is_finished={is_finished}' -async def test_forefront_requests_ordering(request_queue_apify: RequestQueue) -> None: +async def test_forefront_requests_ordering( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test that forefront requests are processed before regular requests.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -164,9 +162,9 @@ async def test_forefront_requests_ordering(request_queue_apify: RequestQueue) -> total_count = await rq.get_total_count() Actor.log.info(f'Added 2 forefront requests, total in queue: {total_count}') - # Fetch requests and verify order + # Fetch requests and verify order. fetched_urls = [] - while next_request := await rq.fetch_next_request(): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): Actor.log.info(f'Fetched request: {next_request.url}') fetched_urls.append(next_request.url) await rq.mark_request_as_handled(next_request) @@ -185,8 +183,12 @@ async def test_forefront_requests_ordering(request_queue_apify: RequestQueue) -> ) -async def test_request_unique_key_behavior(request_queue_apify: RequestQueue) -> None: +async def test_request_unique_key_behavior( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test behavior of custom unique keys.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -213,10 +215,10 @@ async def test_request_unique_key_behavior(request_queue_apify: RequestQueue) -> assert result2.was_already_present is True, f'result2.was_already_present={result2.was_already_present}' assert result3.was_already_present is False, f'result3.was_already_present={result3.was_already_present}' - # Only 2 requests should be fetchable + # Only 2 requests should be fetchable. fetched_count = 0 fetched_requests = [] - while next_request := await rq.fetch_next_request(): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): fetched_count += 1 fetched_requests.append(next_request) await rq.mark_request_as_handled(next_request) @@ -246,8 +248,8 @@ async def test_request_reclaim_functionality( await rq.add_request('https://example.com/test') Actor.log.info('Added test request') - # Fetch and reclaim the request - fetched_request = await rq.fetch_next_request() + # Fetch and reclaim the request. + fetched_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert fetched_request is not None Actor.log.info(f'Fetched request: {fetched_request.url}') @@ -259,12 +261,7 @@ async def test_request_reclaim_functionality( Actor.log.info('Request reclaimed successfully') # Should be able to fetch the same request again. - # In shared mode, there is a propagation delay before the reclaimed request becomes visible - # (see https://github.com/apify/apify-sdk-python/issues/808). - if rq_access_mode == 'shared': - request2 = await call_with_exp_backoff(rq.fetch_next_request) - else: - request2 = await rq.fetch_next_request() + request2 = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert request2 is not None assert request2.url == fetched_request.url @@ -273,10 +270,7 @@ async def test_request_reclaim_functionality( # Mark as handled this time await rq.mark_request_as_handled(request2) - if rq_access_mode == 'shared': - is_finished = await call_with_exp_backoff(rq.is_finished) - else: - is_finished = await rq.is_finished() + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) assert is_finished is True @@ -297,8 +291,8 @@ async def test_request_reclaim_with_forefront( await rq.add_request('https://example.com/3') Actor.log.info('Added 3 requests') - # Fetch first request - first_request = await rq.fetch_next_request() + # Fetch first request. + first_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert first_request is not None Actor.log.info(f'Fetched first request: {first_request.url}') @@ -307,12 +301,7 @@ async def test_request_reclaim_with_forefront( Actor.log.info('Request reclaimed to forefront') # The reclaimed request should be fetched first again. - # In shared mode, there is a propagation delay before the reclaimed request becomes visible - # (see https://github.com/apify/apify-sdk-python/issues/808). - if rq_access_mode == 'shared': - next_request = await call_with_exp_backoff(rq.fetch_next_request) - else: - next_request = await rq.fetch_next_request() + next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert next_request is not None assert next_request.url == first_request.url @@ -330,25 +319,29 @@ async def test_request_reclaim_with_forefront( Actor.log.info(f'Test completed - processed {remaining_count} additional requests') -async def test_complex_request_objects(request_queue_apify: RequestQueue) -> None: +async def test_complex_request_objects( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test handling complex Request objects with various properties.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') # Create request with various properties - request = Request.from_url( + complex_request = Request.from_url( 'https://example.com/api/data', method='POST', headers={'Authorization': 'Bearer token123', 'Content-Type': 'application/json'}, user_data={'category': 'api', 'priority': 'high'}, unique_key='api-request-1', ) - await rq.add_request(request) - Actor.log.info(f'Added complex request: {request.url} with method {request.method}') + await rq.add_request(complex_request) + Actor.log.info(f'Added complex request: {complex_request.url} with method {complex_request.method}') - # Fetch and verify all properties are preserved - fetched_request = await rq.fetch_next_request() + # Fetch and verify all properties are preserved. + fetched_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert fetched_request is not None, f'fetched_request={fetched_request}' Actor.log.info(f'Fetched request: {fetched_request.url}') @@ -398,8 +391,12 @@ async def test_get_request_by_unique_key(request_queue_apify: RequestQueue) -> N Actor.log.info('Non-existent unique_key correctly returned None') -async def test_metadata_tracking(request_queue_apify: RequestQueue) -> None: +async def test_metadata_tracking( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test request queue metadata and counts.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -422,11 +419,11 @@ async def test_metadata_tracking(request_queue_apify: RequestQueue) -> None: assert total_after_add == 5, f'total_after_add={total_after_add}' assert handled_after_add == 0, f'handled_after_add={handled_after_add}' - # Process some requests + # Process some requests. for _ in range(3): - request = await rq.fetch_next_request() - if request: - await rq.mark_request_as_handled(request) + next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + if next_request: + await rq.mark_request_as_handled(next_request) Actor.log.info('Processed 3 requests') @@ -463,9 +460,9 @@ async def test_batch_operations_performance( assert total_count == 50, f'total_count={total_count}' assert handled_count == 0, f'handled_count={handled_count}' - # Process all requests + # Process all requests. processed_count = 0 - while next_request := await rq.fetch_next_request(): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): processed_count += 1 await rq.mark_request_as_handled(next_request) if processed_count >= 50: # Safety break @@ -474,10 +471,7 @@ async def test_batch_operations_performance( Actor.log.info(f'Processing completed. Total processed: {processed_count}') assert processed_count == 50, f'processed_count={processed_count}' - if rq_access_mode == 'shared': - is_finished = await call_with_exp_backoff(rq.is_finished) - else: - is_finished = await rq.is_finished() + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) assert is_finished is True, f'is_finished={is_finished}' @@ -500,12 +494,12 @@ async def test_state_consistency( initial_total = await rq.get_total_count() Actor.log.info(f'Initial total count: {initial_total}') - # Simulate some requests being processed and others being reclaimed + # Simulate some requests being processed and others being reclaimed. processed_requests = [] reclaimed_requests = [] for i in range(5): - next_request = await rq.fetch_next_request() + next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) if next_request: if i % 2 == 0: # Process even indices await rq.mark_request_as_handled(next_request) @@ -530,37 +524,35 @@ async def test_state_consistency( ) assert current_total == 10, f'current_total={current_total}' - # Process remaining requests + # Process remaining requests. remaining_count = 0 - while next_request := await rq.fetch_next_request(): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): remaining_count += 1 await rq.mark_request_as_handled(next_request) Actor.log.info(f'Processed {remaining_count} remaining requests') - if rq_access_mode == 'shared': - is_finished = await call_with_exp_backoff(rq.is_finished) - else: - is_finished = await rq.is_finished() + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) assert is_finished is True, f'is_finished={is_finished}' -async def test_empty_rq_behavior(request_queue_apify: RequestQueue) -> None: +async def test_empty_rq_behavior(request_queue_apify: RequestQueue, request: pytest.FixtureRequest) -> None: """Test behavior with empty queues.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') # Test empty queue operations - is_empty = await rq.is_empty() - is_finished = await rq.is_finished() + is_empty = await call_with_exp_backoff(rq.is_empty, rq_access_mode=rq_access_mode) + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) Actor.log.info(f'Empty queue - is_empty: {is_empty}, is_finished: {is_finished}') assert is_empty is True, f'is_empty={is_empty}' assert is_finished is True, f'is_finished={is_finished}' # Fetch from empty queue - request = await rq.fetch_next_request() - Actor.log.info(f'Fetch result from empty queue: {request}') - assert request is None, f'request={request}' + next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + Actor.log.info(f'Fetch result from empty queue: {next_request}') + assert next_request is None, f'request={next_request}' # Check metadata for empty queue metadata = await rq.get_metadata() @@ -597,30 +589,26 @@ async def test_large_batch_operations( total_count = await rq.get_total_count() assert total_count == 500, f'total_count={total_count}' - # Process all in chunks to test performance + # Process all in chunks to test performance. processed_count = 0 - while not await rq.is_empty(): - next_request = await rq.fetch_next_request() - - # The RQ is_empty should ensure we don't get None - assert next_request is not None, f'next_request={next_request}' - + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): await rq.mark_request_as_handled(next_request) processed_count += 1 Actor.log.info(f'Processing completed. Total processed: {processed_count}') assert processed_count == 500, f'processed_count={processed_count}' - if rq_access_mode == 'shared': - is_finished = await call_with_exp_backoff(rq.is_finished) - else: - is_finished = await rq.is_finished() + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) assert is_finished is True, f'is_finished={is_finished}' -async def test_mixed_string_and_request_objects(request_queue_apify: RequestQueue) -> None: +async def test_mixed_string_and_request_objects( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test adding both string URLs and Request objects.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -647,9 +635,9 @@ async def test_mixed_string_and_request_objects(request_queue_apify: RequestQueu total_count = await rq.get_total_count() Actor.log.info(f'Total requests in queue: {total_count}') - # Fetch and verify all types work + # Fetch and verify all types work. fetched_requests = [] - while next_request := await rq.fetch_next_request(): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): fetched_requests.append(next_request) await rq.mark_request_as_handled(next_request) @@ -667,8 +655,12 @@ async def test_mixed_string_and_request_objects(request_queue_apify: RequestQueu Actor.log.info('Mixed types verified - found request object with user_data') -async def test_persistence_across_operations(request_queue_apify: RequestQueue) -> None: +async def test_persistence_across_operations( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test that queue state persists across different operations.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') # Open queue and add some requests rq = request_queue_apify @@ -682,12 +674,12 @@ async def test_persistence_across_operations(request_queue_apify: RequestQueue) initial_total = await rq.get_total_count() Actor.log.info(f'Total count after initial batch: {initial_total}') - # Process some requests + # Process some requests. processed_count = 0 for _ in range(5): - request = await rq.fetch_next_request() - if request: - await rq.mark_request_as_handled(request) + next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + if next_request: + await rq.mark_request_as_handled(next_request) processed_count += 1 Actor.log.info(f'Processed {processed_count} requests from initial batch') @@ -708,18 +700,14 @@ async def test_persistence_across_operations(request_queue_apify: RequestQueue) assert total_after_additional == 15, f'total_after_additional={total_after_additional}' assert handled_after_additional == 5, f'handled_after_additional={handled_after_additional}' - # Process remaining + # Process remaining. remaining_processed = 0 - while not await rq.is_finished(): - request = await rq.fetch_next_request() - if request: - remaining_processed += 1 - await rq.mark_request_as_handled(request) - else: - break + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + remaining_processed += 1 + await rq.mark_request_as_handled(next_request) Actor.log.info(f'Processed {remaining_processed} remaining requests') - is_finished = await rq.is_finished() + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) final_total = await rq.get_total_count() final_handled = await rq.get_handled_count() @@ -786,8 +774,12 @@ async def test_request_deduplication_edge_cases( ) -async def test_request_ordering_with_mixed_operations(request_queue_apify: RequestQueue) -> None: +async def test_request_ordering_with_mixed_operations( + request_queue_apify: RequestQueue, + request: pytest.FixtureRequest, +) -> None: """Test request ordering with mixed add/reclaim operations.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -797,8 +789,8 @@ async def test_request_ordering_with_mixed_operations(request_queue_apify: Reque await rq.add_request('https://example.com/2') Actor.log.info('Added initial requests') - # Fetch one and reclaim to forefront - request1 = await rq.fetch_next_request() + # Fetch one and reclaim to forefront. + request1 = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert request1 is not None, f'request1={request1}' assert request1.url == 'https://example.com/1', f'request1.url={request1.url}' Actor.log.info(f'Fetched request: {request1.url}') @@ -810,9 +802,9 @@ async def test_request_ordering_with_mixed_operations(request_queue_apify: Reque await rq.add_request('https://example.com/priority', forefront=True) Actor.log.info('Added new forefront request') - # Fetch all requests and verify forefront behavior + # Fetch all requests and verify forefront behavior. urls_ordered = list[str]() - while next_request := await rq.fetch_next_request(): + while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): urls_ordered.append(next_request.url) await rq.mark_request_as_handled(next_request) @@ -1046,38 +1038,38 @@ async def test_rq_long_url( assert processed_request is not None assert processed_request.id == request_id - request_obtained = await rq.fetch_next_request() + request_obtained = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert request_obtained is not None await rq.mark_request_as_handled(request_obtained) - if rq_access_mode == 'shared': - is_finished = await call_with_exp_backoff(rq.is_finished) - else: - is_finished = await rq.is_finished() + is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) assert is_finished async def test_pre_existing_request_with_user_data( - request_queue_apify: RequestQueue, apify_client_async: ApifyClientAsync + request_queue_apify: RequestQueue, + apify_client_async: ApifyClientAsync, + request: pytest.FixtureRequest, ) -> None: """Test that pre-existing requests with user data are fully fetched. list_head does not return user data, so we need to test that fetching unknown requests is not relying on it.""" + rq_access_mode = request.node.callspec.params.get('request_queue_apify') custom_data = {'key': 'value'} rq = request_queue_apify - request = Request.from_url( + req = Request.from_url( 'https://example.com', user_data=custom_data.copy(), ) # Add request by a different producer rq_client = apify_client_async.request_queue(request_queue_id=rq.id) - await rq_client.add_request(request.model_dump(by_alias=True)) + await rq_client.add_request(req.model_dump(by_alias=True)) - # Fetch the request by the client under test - request_obtained = await rq.fetch_next_request() + # Fetch the request by the client under test. + request_obtained = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) assert request_obtained is not None # Test that custom_data is preserved in user_data (custom_data should be subset of obtained user_data) assert custom_data.items() <= request_obtained.user_data.items() @@ -1111,17 +1103,14 @@ async def test_request_queue_is_finished( await request_queue_apify.add_request(Request.from_url('http://example.com')) assert not await request_queue_apify.is_finished() - fetched = await request_queue_apify.fetch_next_request() + fetched = await call_with_exp_backoff(request_queue_apify.fetch_next_request, rq_access_mode=rq_access_mode) assert fetched is not None assert not await request_queue_apify.is_finished(), ( 'RequestQueue should not be finished unless the request is marked as handled.' ) await request_queue_apify.mark_request_as_handled(fetched) - if rq_access_mode == 'shared': - assert await call_with_exp_backoff(request_queue_apify.is_finished) - else: - assert await request_queue_apify.is_finished() + assert await call_with_exp_backoff(request_queue_apify.is_finished, rq_access_mode=rq_access_mode) async def test_request_queue_deduplication_unprocessed_requests( @@ -1387,13 +1376,9 @@ async def worker() -> int: assert total_after_workers == 20 remaining_count = 0 - while not await rq.is_finished(): - request = await rq.fetch_next_request() - if request: - remaining_count += 1 - await rq.mark_request_as_handled(request) - else: - break + while request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode='shared'): + remaining_count += 1 + await rq.mark_request_as_handled(request) final_handled = await rq.get_handled_count() final_total = await rq.get_total_count()