Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 91 additions & 48 deletions tests/integration/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async def test_request_queue_collection_get_or_create(client: ApifyClient | Apif
await maybe_await(client.request_queue(rq.id).delete())


async def test_request_queue_lock(client: ApifyClient | ApifyClientAsync) -> None:
async def test_request_queue_lock(client: ApifyClient | ApifyClientAsync, *, is_async: bool) -> None:
result = await maybe_await(client.request_queues().get_or_create(name=get_random_resource_name('queue')))
created_rq = cast('RequestQueue', result)
rq = client.request_queue(created_rq.id, client_key=get_random_string(10))
Expand All @@ -78,8 +78,17 @@ async def test_request_queue_lock(client: ApifyClient | ApifyClientAsync) -> Non
rq.add_request({'url': f'http://test-lock.com/{i}', 'uniqueKey': f'http://test-lock.com/{i}'})
)

result = await maybe_await(rq.list_and_lock_head(limit=10, lock_duration=timedelta(seconds=10)))
get_head_and_lock_response = cast('LockedRequestQueueHead', result)
# Poll until all requests are available for locking (eventual consistency)
get_head_and_lock_response: LockedRequestQueueHead | None = None
for _ in range(5):
await maybe_sleep(1, is_async=is_async)
result = await maybe_await(rq.list_and_lock_head(limit=10, lock_duration=timedelta(seconds=10)))
get_head_and_lock_response = cast('LockedRequestQueueHead', result)
if len(get_head_and_lock_response.items) == 10:
break

assert get_head_and_lock_response is not None
assert len(get_head_and_lock_response.items) == 10

for locked_request in get_head_and_lock_response.items:
assert locked_request.lock_expires_at is not None
Expand Down Expand Up @@ -219,12 +228,15 @@ async def test_request_queue_list_head(client: ApifyClient | ApifyClientAsync, *
)
)

# Wait briefly for eventual consistency
await maybe_sleep(1, is_async=is_async)
# Poll until requests are available (eventual consistency)
head_response: RequestQueueHead | None = None
for _ in range(5):
await maybe_sleep(1, is_async=is_async)
result = await maybe_await(rq_client.list_head(limit=3))
head_response = cast('RequestQueueHead', result)
if len(head_response.items) == 3:
break

# List head
result = await maybe_await(rq_client.list_head(limit=3))
head_response = cast('RequestQueueHead', result)
assert head_response is not None
assert len(head_response.items) == 3
finally:
Expand All @@ -251,12 +263,15 @@ async def test_request_queue_list_requests(client: ApifyClient | ApifyClientAsyn
)
)

# Wait briefly for eventual consistency
await maybe_sleep(1, is_async=is_async)
# Poll until all requests are available (eventual consistency)
list_response: ListOfRequests | None = None
for _ in range(5):
await maybe_sleep(1, is_async=is_async)
result = await maybe_await(rq_client.list_requests())
list_response = cast('ListOfRequests', result)
if len(list_response.items) == 5:
break

# List all requests
result = await maybe_await(rq_client.list_requests())
list_response = cast('ListOfRequests', result)
assert list_response is not None
assert len(list_response.items) == 5
finally:
Expand Down Expand Up @@ -320,12 +335,16 @@ async def test_request_queue_batch_add_requests(client: ApifyClient | ApifyClien
assert len(batch_response.processed_requests) == 10
assert len(batch_response.unprocessed_requests) == 0

# Wait briefly for eventual consistency
await maybe_sleep(1, is_async=is_async)
# Poll until all requests are available (eventual consistency)
list_response: ListOfRequests | None = None
for _ in range(5):
await maybe_sleep(1, is_async=is_async)
result = await maybe_await(rq_client.list_requests())
list_response = cast('ListOfRequests', result)
if len(list_response.items) == 10:
break

# Verify requests were added
result = await maybe_await(rq_client.list_requests())
list_response = cast('ListOfRequests', result)
assert list_response is not None
assert len(list_response.items) == 10
finally:
await maybe_await(rq_client.delete())
Expand All @@ -351,12 +370,17 @@ async def test_request_queue_batch_delete_requests(client: ApifyClient | ApifyCl
)
)

# Wait briefly for eventual consistency
await maybe_sleep(1, is_async=is_async)
# Poll until all requests are available (eventual consistency)
list_response: ListOfRequests | None = None
for _ in range(5):
await maybe_sleep(1, is_async=is_async)
result = await maybe_await(rq_client.list_requests())
list_response = cast('ListOfRequests', result)
if len(list_response.items) == 10:
break

# List requests to get IDs
result = await maybe_await(rq_client.list_requests())
list_response = cast('ListOfRequests', result)
assert list_response is not None
assert len(list_response.items) == 10
requests_to_delete = [{'uniqueKey': item.unique_key} for item in list_response.items[:5]]

# Batch delete
Expand All @@ -365,12 +389,16 @@ async def test_request_queue_batch_delete_requests(client: ApifyClient | ApifyCl
assert delete_response is not None
assert len(delete_response.processed_requests) == 5

# Wait briefly
await maybe_sleep(1, is_async=is_async)
# Poll until deletions are reflected (eventual consistency)
remaining: ListOfRequests | None = None
for _ in range(5):
await maybe_sleep(1, is_async=is_async)
result = await maybe_await(rq_client.list_requests())
remaining = cast('ListOfRequests', result)
if len(remaining.items) == 5:
break

# Verify remaining requests
result = await maybe_await(rq_client.list_requests())
remaining = cast('ListOfRequests', result)
assert remaining is not None
assert len(remaining.items) == 5
finally:
await maybe_await(rq_client.delete())
Expand Down Expand Up @@ -405,12 +433,15 @@ async def test_request_queue_list_and_lock_head(client: ApifyClient | ApifyClien
for i in range(5):
await maybe_await(rq_client.add_request({'url': f'https://example.com/lock-{i}', 'uniqueKey': f'lock-{i}'}))

# Wait briefly for eventual consistency
await maybe_sleep(1, is_async=is_async)
# Poll until requests are available for locking (eventual consistency)
lock_response: LockedRequestQueueHead | None = None
for _ in range(5):
await maybe_sleep(1, is_async=is_async)
result = await maybe_await(rq_client.list_and_lock_head(limit=3, lock_duration=timedelta(seconds=60)))
lock_response = cast('LockedRequestQueueHead', result)
if len(lock_response.items) == 3:
break

# Lock head requests
result = await maybe_await(rq_client.list_and_lock_head(limit=3, lock_duration=timedelta(seconds=60)))
lock_response = cast('LockedRequestQueueHead', result)
assert lock_response is not None
assert len(lock_response.items) == 3

Expand All @@ -434,12 +465,16 @@ async def test_request_queue_prolong_request_lock(client: ApifyClient | ApifyCli
# Add a request
await maybe_await(rq_client.add_request({'url': 'https://example.com/prolong', 'uniqueKey': 'prolong-test'}))

# Wait briefly for eventual consistency
await maybe_sleep(1, is_async=is_async)
# Poll until the request is available for locking (eventual consistency)
lock_response: LockedRequestQueueHead | None = None
for _ in range(5):
await maybe_sleep(1, is_async=is_async)
result = await maybe_await(rq_client.list_and_lock_head(limit=1, lock_duration=timedelta(seconds=60)))
lock_response = cast('LockedRequestQueueHead', result)
if len(lock_response.items) == 1:
break

# Lock the request
result = await maybe_await(rq_client.list_and_lock_head(limit=1, lock_duration=timedelta(seconds=60)))
lock_response = cast('LockedRequestQueueHead', result)
assert lock_response is not None
assert len(lock_response.items) == 1
locked_request = lock_response.items[0]
original_lock_expires = locked_request.lock_expires_at
Expand Down Expand Up @@ -468,12 +503,16 @@ async def test_request_queue_delete_request_lock(client: ApifyClient | ApifyClie
# Add a request
await maybe_await(rq_client.add_request({'url': 'https://example.com/unlock', 'uniqueKey': 'unlock-test'}))

# Wait briefly for eventual consistency
await maybe_sleep(1, is_async=is_async)
# Poll until the request is available for locking (eventual consistency)
lock_response: LockedRequestQueueHead | None = None
for _ in range(5):
await maybe_sleep(1, is_async=is_async)
result = await maybe_await(rq_client.list_and_lock_head(limit=1, lock_duration=timedelta(seconds=60)))
lock_response = cast('LockedRequestQueueHead', result)
if len(lock_response.items) == 1:
break

# Lock the request
result = await maybe_await(rq_client.list_and_lock_head(limit=1, lock_duration=timedelta(seconds=60)))
lock_response = cast('LockedRequestQueueHead', result)
assert lock_response is not None
assert len(lock_response.items) == 1
locked_request = lock_response.items[0]

Expand Down Expand Up @@ -503,12 +542,16 @@ async def test_request_queue_unlock_requests(client: ApifyClient | ApifyClientAs
rq_client.add_request({'url': f'https://example.com/unlock-{i}', 'uniqueKey': f'unlock-{i}'})
)

# Wait briefly for eventual consistency
await maybe_sleep(1, is_async=is_async)
# Poll until requests are available for locking (eventual consistency)
lock_response: LockedRequestQueueHead | None = None
for _ in range(5):
await maybe_sleep(1, is_async=is_async)
result = await maybe_await(rq_client.list_and_lock_head(limit=3, lock_duration=timedelta(seconds=60)))
lock_response = cast('LockedRequestQueueHead', result)
if len(lock_response.items) == 3:
break

# Lock some requests
result = await maybe_await(rq_client.list_and_lock_head(limit=3, lock_duration=timedelta(seconds=60)))
lock_response = cast('LockedRequestQueueHead', result)
assert lock_response is not None
assert len(lock_response.items) == 3

# Unlock all requests
Expand Down
Loading