diff --git a/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py b/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py index 77c3b294..0f30623f 100644 --- a/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py +++ b/packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py @@ -104,8 +104,9 @@ async def close_task(self, req): 3. Wait for all the results from the task to be written to the global result queue 4. Remove the task from the tracker """ - d = req.payload - keys = d.keys + # Use keyed_window.keys for task lookup since payload.keys may be empty + # (e.g., CLOSE operations don't carry data, so payload.keys is not populated). + keys = req.keyed_window.keys unified_key = build_unique_key_name(keys) curr_task = self.tasks.get(unified_key, None) @@ -127,7 +128,9 @@ async def create_task(self, req): it creates a new task or appends the request to the existing task. """ d = req.payload - keys = d.keys + # Use keyed_window.keys for task lookup — the authoritative key identity + # for the window, consistent across all operation types (OPEN, APPEND, CLOSE). + keys = req.keyed_window.keys unified_key = build_unique_key_name(keys) curr_task = self.tasks.get(unified_key, None) @@ -178,7 +181,8 @@ async def send_datum_to_task(self, req): If the task does not exist, create it. """ d = req.payload - keys = d.keys + # Use keyed_window.keys for task lookup to match the key used in create_task/close_task. + keys = req.keyed_window.keys unified_key = build_unique_key_name(keys) result = self.tasks.get(unified_key, None) if not result: diff --git a/packages/pynumaflow/tests/accumulator/test_async_accumulator.py b/packages/pynumaflow/tests/accumulator/test_async_accumulator.py index e0927f8e..fa8f0d60 100644 --- a/packages/pynumaflow/tests/accumulator/test_async_accumulator.py +++ b/packages/pynumaflow/tests/accumulator/test_async_accumulator.py @@ -30,9 +30,11 @@ def request_generator(count, request, resetkey: bool = False, send_close: bool = False): for i in range(count): if resetkey: - # Clear previous keys and add new ones + # Update keys on both payload and keyedWindow to match real platform behavior del request.payload.keys[:] request.payload.keys.extend([f"key-{i}"]) + del request.operation.keyedWindow.keys[:] + request.operation.keyedWindow.keys.extend([f"key-{i}"]) # Set operation based on index - first is OPEN, rest are APPEND if i == 0: @@ -52,9 +54,11 @@ def request_generator(count, request, resetkey: bool = False, send_close: bool = def request_generator_append_only(count, request, resetkey: bool = False): for i in range(count): if resetkey: - # Clear previous keys and add new ones + # Update keys on both payload and keyedWindow to match real platform behavior del request.payload.keys[:] request.payload.keys.extend([f"key-{i}"]) + del request.operation.keyedWindow.keys[:] + request.operation.keyedWindow.keys.extend([f"key-{i}"]) # Set operation to APPEND for all requests request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND @@ -64,9 +68,11 @@ def request_generator_append_only(count, request, resetkey: bool = False): def request_generator_mixed(count, request, resetkey: bool = False): for i in range(count): if resetkey: - # Clear previous keys and add new ones + # Update keys on both payload and keyedWindow to match real platform behavior del request.payload.keys[:] request.payload.keys.extend([f"key-{i}"]) + del request.operation.keyedWindow.keys[:] + request.operation.keyedWindow.keys.extend([f"key-{i}"]) if i % 2 == 0: # Set operation to APPEND for even requests @@ -107,7 +113,12 @@ def start_request() -> accumulator_pb2.AccumulatorRequest: def start_request_without_open() -> accumulator_pb2.AccumulatorRequest: event_time_timestamp, watermark_timestamp = get_time_args() - + window = accumulator_pb2.KeyedWindow( + start=mock_interval_window_start(), + end=mock_interval_window_end(), + slot="slot-0", + keys=["test_key"], + ) payload = accumulator_pb2.Payload( keys=["test_key"], value=mock_message(), @@ -115,9 +126,13 @@ def start_request_without_open() -> accumulator_pb2.AccumulatorRequest: watermark=watermark_timestamp, id="test_id", ) - + operation = accumulator_pb2.AccumulatorRequest.WindowOperation( + event=accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND, + keyedWindow=window, + ) request = accumulator_pb2.AccumulatorRequest( payload=payload, + operation=operation, ) return request