diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 170deb3a..4b6a4221 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -66,7 +66,7 @@ async def test_request_queue_collection_get_or_create(client: ApifyClient | Apif await maybe_await(client.request_queue(rq.id).delete()) -async def test_request_queue_lock(client: ApifyClient | ApifyClientAsync) -> None: +async def test_request_queue_lock(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None: result = await maybe_await(client.request_queues().get_or_create(name=get_random_resource_name('queue'))) created_rq = cast('RequestQueue', result) rq = client.request_queue(created_rq.id, client_key=get_random_string(10)) @@ -78,8 +78,17 @@ async def test_request_queue_lock(client: ApifyClient | ApifyClientAsync) -> Non rq.add_request({'url': f'http://test-lock.com/{i}', 'uniqueKey': f'http://test-lock.com/{i}'}) ) - result = await maybe_await(rq.list_and_lock_head(limit=10, lock_duration=timedelta(seconds=10))) - get_head_and_lock_response = cast('LockedRequestQueueHead', result) + # Poll until all requests are available for locking (eventual consistency) + get_head_and_lock_response: LockedRequestQueueHead | None = None + for _ in range(5): + await maybe_sleep(1, is_async=is_async) + result = await maybe_await(rq.list_and_lock_head(limit=10, lock_duration=timedelta(seconds=10))) + get_head_and_lock_response = cast('LockedRequestQueueHead', result) + if len(get_head_and_lock_response.items) == 10: + break + + assert get_head_and_lock_response is not None + assert len(get_head_and_lock_response.items) == 10 for locked_request in get_head_and_lock_response.items: assert locked_request.lock_expires_at is not None @@ -219,12 +228,15 @@ async def test_request_queue_list_head(client: ApifyClient | ApifyClientAsync, * ) ) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until requests are available (eventual consistency) + head_response: RequestQueueHead | None = None + for _ in range(5): + await maybe_sleep(1, is_async=is_async) + result = await maybe_await(rq_client.list_head(limit=3)) + head_response = cast('RequestQueueHead', result) + if len(head_response.items) == 3: + break - # List head - result = await maybe_await(rq_client.list_head(limit=3)) - head_response = cast('RequestQueueHead', result) assert head_response is not None assert len(head_response.items) == 3 finally: @@ -251,12 +263,15 @@ async def test_request_queue_list_requests(client: ApifyClient | ApifyClientAsyn ) ) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until all requests are available (eventual consistency) + list_response: ListOfRequests | None = None + for _ in range(5): + await maybe_sleep(1, is_async=is_async) + result = await maybe_await(rq_client.list_requests()) + list_response = cast('ListOfRequests', result) + if len(list_response.items) == 5: + break - # List all requests - result = await maybe_await(rq_client.list_requests()) - list_response = cast('ListOfRequests', result) assert list_response is not None assert len(list_response.items) == 5 finally: @@ -320,12 +335,16 @@ async def test_request_queue_batch_add_requests(client: ApifyClient | ApifyClien assert len(batch_response.processed_requests) == 10 assert len(batch_response.unprocessed_requests) == 0 - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until all requests are available (eventual consistency) + list_response: ListOfRequests | None = None + for _ in range(5): + await maybe_sleep(1, is_async=is_async) + result = await maybe_await(rq_client.list_requests()) + list_response = cast('ListOfRequests', result) + if len(list_response.items) == 10: + break - # Verify requests were added - result = await maybe_await(rq_client.list_requests()) - list_response = cast('ListOfRequests', result) + assert list_response is not None assert len(list_response.items) == 10 finally: await maybe_await(rq_client.delete()) @@ -351,12 +370,17 @@ async def test_request_queue_batch_delete_requests(client: ApifyClient | ApifyCl ) ) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until all requests are available (eventual consistency) + list_response: ListOfRequests | None = None + for _ in range(5): + await maybe_sleep(1, is_async=is_async) + result = await maybe_await(rq_client.list_requests()) + list_response = cast('ListOfRequests', result) + if len(list_response.items) == 10: + break - # List requests to get IDs - result = await maybe_await(rq_client.list_requests()) - list_response = cast('ListOfRequests', result) + assert list_response is not None + assert len(list_response.items) == 10 requests_to_delete = [{'uniqueKey': item.unique_key} for item in list_response.items[:5]] # Batch delete @@ -365,12 +389,16 @@ async def test_request_queue_batch_delete_requests(client: ApifyClient | ApifyCl assert delete_response is not None assert len(delete_response.processed_requests) == 5 - # Wait briefly - await maybe_sleep(1, is_async=is_async) + # Poll until deletions are reflected (eventual consistency) + remaining: ListOfRequests | None = None + for _ in range(5): + await maybe_sleep(1, is_async=is_async) + result = await maybe_await(rq_client.list_requests()) + remaining = cast('ListOfRequests', result) + if len(remaining.items) == 5: + break - # Verify remaining requests - result = await maybe_await(rq_client.list_requests()) - remaining = cast('ListOfRequests', result) + assert remaining is not None assert len(remaining.items) == 5 finally: await maybe_await(rq_client.delete()) @@ -405,12 +433,15 @@ async def test_request_queue_list_and_lock_head(client: ApifyClient | ApifyClien for i in range(5): await maybe_await(rq_client.add_request({'url': f'https://example.com/lock-{i}', 'uniqueKey': f'lock-{i}'})) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until requests are available for locking (eventual consistency) + lock_response: LockedRequestQueueHead | None = None + for _ in range(5): + await maybe_sleep(1, is_async=is_async) + result = await maybe_await(rq_client.list_and_lock_head(limit=3, lock_duration=timedelta(seconds=60))) + lock_response = cast('LockedRequestQueueHead', result) + if len(lock_response.items) == 3: + break - # Lock head requests - result = await maybe_await(rq_client.list_and_lock_head(limit=3, lock_duration=timedelta(seconds=60))) - lock_response = cast('LockedRequestQueueHead', result) assert lock_response is not None assert len(lock_response.items) == 3 @@ -434,12 +465,16 @@ async def test_request_queue_prolong_request_lock(client: ApifyClient | ApifyCli # Add a request await maybe_await(rq_client.add_request({'url': 'https://example.com/prolong', 'uniqueKey': 'prolong-test'})) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until the request is available for locking (eventual consistency) + lock_response: LockedRequestQueueHead | None = None + for _ in range(5): + await maybe_sleep(1, is_async=is_async) + result = await maybe_await(rq_client.list_and_lock_head(limit=1, lock_duration=timedelta(seconds=60))) + lock_response = cast('LockedRequestQueueHead', result) + if len(lock_response.items) == 1: + break - # Lock the request - result = await maybe_await(rq_client.list_and_lock_head(limit=1, lock_duration=timedelta(seconds=60))) - lock_response = cast('LockedRequestQueueHead', result) + assert lock_response is not None assert len(lock_response.items) == 1 locked_request = lock_response.items[0] original_lock_expires = locked_request.lock_expires_at @@ -468,12 +503,16 @@ async def test_request_queue_delete_request_lock(client: ApifyClient | ApifyClie # Add a request await maybe_await(rq_client.add_request({'url': 'https://example.com/unlock', 'uniqueKey': 'unlock-test'})) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until the request is available for locking (eventual consistency) + lock_response: LockedRequestQueueHead | None = None + for _ in range(5): + await maybe_sleep(1, is_async=is_async) + result = await maybe_await(rq_client.list_and_lock_head(limit=1, lock_duration=timedelta(seconds=60))) + lock_response = cast('LockedRequestQueueHead', result) + if len(lock_response.items) == 1: + break - # Lock the request - result = await maybe_await(rq_client.list_and_lock_head(limit=1, lock_duration=timedelta(seconds=60))) - lock_response = cast('LockedRequestQueueHead', result) + assert lock_response is not None assert len(lock_response.items) == 1 locked_request = lock_response.items[0] @@ -503,12 +542,16 @@ async def test_request_queue_unlock_requests(client: ApifyClient | ApifyClientAs rq_client.add_request({'url': f'https://example.com/unlock-{i}', 'uniqueKey': f'unlock-{i}'}) ) - # Wait briefly for eventual consistency - await maybe_sleep(1, is_async=is_async) + # Poll until requests are available for locking (eventual consistency) + lock_response: LockedRequestQueueHead | None = None + for _ in range(5): + await maybe_sleep(1, is_async=is_async) + result = await maybe_await(rq_client.list_and_lock_head(limit=3, lock_duration=timedelta(seconds=60))) + lock_response = cast('LockedRequestQueueHead', result) + if len(lock_response.items) == 3: + break - # Lock some requests - result = await maybe_await(rq_client.list_and_lock_head(limit=3, lock_duration=timedelta(seconds=60))) - lock_response = cast('LockedRequestQueueHead', result) + assert lock_response is not None assert len(lock_response.items) == 3 # Unlock all requests