Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ async def close_task(self, req):
3. Wait for all the results from the task to be written to the global result queue
4. Remove the task from the tracker
"""
d = req.payload
keys = d.keys
# Use keyed_window.keys for task lookup since payload.keys may be empty
# (e.g., CLOSE operations don't carry data, so payload.keys is not populated).
keys = req.keyed_window.keys
unified_key = build_unique_key_name(keys)
curr_task = self.tasks.get(unified_key, None)

Expand All @@ -127,7 +128,9 @@ async def create_task(self, req):
it creates a new task or appends the request to the existing task.
"""
d = req.payload
keys = d.keys
# Use keyed_window.keys for task lookup — the authoritative key identity
# for the window, consistent across all operation types (OPEN, APPEND, CLOSE).
keys = req.keyed_window.keys
unified_key = build_unique_key_name(keys)
curr_task = self.tasks.get(unified_key, None)

Expand Down Expand Up @@ -178,7 +181,8 @@ async def send_datum_to_task(self, req):
If the task does not exist, create it.
"""
d = req.payload
keys = d.keys
# Use keyed_window.keys for task lookup to match the key used in create_task/close_task.
keys = req.keyed_window.keys
unified_key = build_unique_key_name(keys)
result = self.tasks.get(unified_key, None)
if not result:
Expand Down
25 changes: 20 additions & 5 deletions packages/pynumaflow/tests/accumulator/test_async_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
def request_generator(count, request, resetkey: bool = False, send_close: bool = False):
for i in range(count):
if resetkey:
# Clear previous keys and add new ones
# Update keys on both payload and keyedWindow to match real platform behavior
del request.payload.keys[:]
request.payload.keys.extend([f"key-{i}"])
del request.operation.keyedWindow.keys[:]
request.operation.keyedWindow.keys.extend([f"key-{i}"])

# Set operation based on index - first is OPEN, rest are APPEND
if i == 0:
Expand All @@ -52,9 +54,11 @@ def request_generator(count, request, resetkey: bool = False, send_close: bool =
def request_generator_append_only(count, request, resetkey: bool = False):
for i in range(count):
if resetkey:
# Clear previous keys and add new ones
# Update keys on both payload and keyedWindow to match real platform behavior
del request.payload.keys[:]
request.payload.keys.extend([f"key-{i}"])
del request.operation.keyedWindow.keys[:]
request.operation.keyedWindow.keys.extend([f"key-{i}"])

# Set operation to APPEND for all requests
request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND
Expand All @@ -64,9 +68,11 @@ def request_generator_append_only(count, request, resetkey: bool = False):
def request_generator_mixed(count, request, resetkey: bool = False):
for i in range(count):
if resetkey:
# Clear previous keys and add new ones
# Update keys on both payload and keyedWindow to match real platform behavior
del request.payload.keys[:]
request.payload.keys.extend([f"key-{i}"])
del request.operation.keyedWindow.keys[:]
request.operation.keyedWindow.keys.extend([f"key-{i}"])

if i % 2 == 0:
# Set operation to APPEND for even requests
Expand Down Expand Up @@ -107,17 +113,26 @@ def start_request() -> accumulator_pb2.AccumulatorRequest:

def start_request_without_open() -> accumulator_pb2.AccumulatorRequest:
event_time_timestamp, watermark_timestamp = get_time_args()

window = accumulator_pb2.KeyedWindow(
start=mock_interval_window_start(),
end=mock_interval_window_end(),
slot="slot-0",
keys=["test_key"],
)
payload = accumulator_pb2.Payload(
keys=["test_key"],
value=mock_message(),
event_time=event_time_timestamp,
watermark=watermark_timestamp,
id="test_id",
)

operation = accumulator_pb2.AccumulatorRequest.WindowOperation(
event=accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND,
keyedWindow=window,
)
request = accumulator_pb2.AccumulatorRequest(
payload=payload,
operation=operation,
)
return request

Expand Down