From 37ed13f7673b66035c3000740e4ea1b92e993865 Mon Sep 17 00:00:00 2001 From: Li Jiajia Date: Sun, 5 Apr 2026 22:23:06 +0800 Subject: [PATCH] [python] Fix read_entries_parallel to use order-dependent merge via FileEntry.merge_entries --- .../manifest/manifest_file_manager.py | 29 +--- .../manifest_entry_identifier_test.py | 151 +++++++++++++++++- 2 files changed, 150 insertions(+), 30 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index 0ed50918253c..951ff0d1438c 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -24,6 +24,7 @@ from datetime import datetime from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.manifest.schema.file_entry import FileEntry from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA, ManifestEntry) from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta @@ -53,33 +54,13 @@ def read_entries_parallel(self, manifest_files: List[ManifestFileMeta], manifest def _process_single_manifest(manifest_file: ManifestFileMeta) -> List[ManifestEntry]: return self.read(manifest_file.file_name, manifest_entry_filter, drop_stats) - def _entry_identifier(e: ManifestEntry) -> tuple: - return ( - tuple(e.partition.values), - e.bucket, - e.file.level, - e.file.file_name, - tuple(e.file.extra_files) if e.file.extra_files else (), - e.file.embedded_index, - e.file.external_path, - ) - - deleted_entry_keys = set() - added_entries = [] + all_entries = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: future_results = executor.map(_process_single_manifest, manifest_files) for entries in future_results: - for entry in entries: - if entry.kind == 0: # ADD - added_entries.append(entry) - else: # DELETE - deleted_entry_keys.add(_entry_identifier(entry)) - - final_entries = [ - entry for entry in added_entries - if _entry_identifier(entry) not in deleted_entry_keys - ] - return final_entries + all_entries.extend(entries) + + return FileEntry.merge_entries(all_entries) def read(self, manifest_file_name: str, manifest_entry_filter=None, drop_stats=True) -> List[ManifestEntry]: manifest_file_path = f"{self.manifest_path}/{manifest_file_name}" diff --git a/paimon-python/pypaimon/tests/manifest/manifest_entry_identifier_test.py b/paimon-python/pypaimon/tests/manifest/manifest_entry_identifier_test.py index f78c86bce09d..5534e8c681d7 100644 --- a/paimon-python/pypaimon/tests/manifest/manifest_entry_identifier_test.py +++ b/paimon-python/pypaimon/tests/manifest/manifest_entry_identifier_test.py @@ -200,9 +200,10 @@ def test_add_delete_different_levels(self): final_entries = self.manifest_file_manager.read_entries_parallel( [manifest_file_1, manifest_file_2]) - # Different levels -> different identifiers -> should NOT match - self.assertEqual(len(final_entries), 1, "Different levels should NOT match") - self.assertEqual(final_entries[0].file.level, 0, "ADD entry with level=0 should remain") + # Different levels -> different identifiers -> both preserved + self.assertEqual(len(final_entries), 2, "Different levels should NOT match") + kinds = {e.kind for e in final_entries} + self.assertEqual(kinds, {0, 1}, "Both ADD and DELETE should remain") def test_add_delete_different_extra_files(self): """ @@ -250,9 +251,147 @@ def test_add_delete_different_extra_files(self): final_entries = self.manifest_file_manager.read_entries_parallel( [manifest_file_1, manifest_file_2]) - # Different extra_files -> different identifiers -> should NOT match - self.assertEqual(len(final_entries), 1, "Different extra_files should NOT match") - self.assertEqual(final_entries[0].file.extra_files, ["index.idx"]) + # Different extra_files -> different identifiers -> both preserved + self.assertEqual(len(final_entries), 2, "Different extra_files should NOT match") + kinds = {e.kind for e in final_entries} + self.assertEqual(kinds, {0, 1}, "Both ADD and DELETE should remain") + + def test_delete_then_add_same_identifier(self): + """DELETE followed by ADD of the same identifier (compaction scenario). + + - Manifest 1: ADD(file_A) + - Manifest 2: DELETE(file_A), ADD(file_A) (re-added after compaction) + - Expected: file_A exists (the re-ADD survives) + - Old bug: global DELETE set filtered ALL matching ADDs regardless of order. + """ + partition = GenericRow([], []) + + add_entry_v1 = ManifestEntry( + kind=0, partition=partition, bucket=0, total_buckets=1, + file=self._create_file_meta("data-1.parquet", level=0) + ) + delete_entry = ManifestEntry( + kind=1, partition=partition, bucket=0, total_buckets=1, + file=self._create_file_meta("data-1.parquet", level=0) + ) + add_entry_v2 = ManifestEntry( + kind=0, partition=partition, bucket=0, total_buckets=1, + file=self._create_file_meta("data-1.parquet", level=0) + ) + + manifest_file_1 = ManifestFileMeta( + file_name="manifest-1.avro", file_size=1024, + num_added_files=1, num_deleted_files=0, + partition_stats=SimpleStats.empty_stats(), schema_id=0 + ) + manifest_file_2 = ManifestFileMeta( + file_name="manifest-2.avro", file_size=1024, + num_added_files=1, num_deleted_files=1, + partition_stats=SimpleStats.empty_stats(), schema_id=0 + ) + + self.manifest_file_manager.write(manifest_file_1.file_name, [add_entry_v1]) + self.manifest_file_manager.write(manifest_file_2.file_name, [delete_entry, add_entry_v2]) + + final_entries = self.manifest_file_manager.read_entries_parallel( + [manifest_file_1, manifest_file_2]) + + # ADD(v1) cancelled by DELETE, then ADD(v2) re-added -> 1 entry + self.assertEqual(len(final_entries), 1, + "Re-added file after DELETE should survive") + self.assertEqual(final_entries[0].kind, 0, "Surviving entry should be ADD") + + def test_unmatched_delete_preserved(self): + """A DELETE without a matching ADD should be preserved in the output.""" + partition = GenericRow([], []) + + delete_entry = ManifestEntry( + kind=1, partition=partition, bucket=0, total_buckets=1, + file=self._create_file_meta("data-orphan.parquet", level=0) + ) + + manifest_file = ManifestFileMeta( + file_name="manifest-1.avro", file_size=1024, + num_added_files=0, num_deleted_files=1, + partition_stats=SimpleStats.empty_stats(), schema_id=0 + ) + + self.manifest_file_manager.write(manifest_file.file_name, [delete_entry]) + + final_entries = self.manifest_file_manager.read_entries_parallel([manifest_file]) + + self.assertEqual(len(final_entries), 1, "Unmatched DELETE should be preserved") + self.assertEqual(final_entries[0].kind, 1) + + +class MergeEntriesUnitTest(unittest.TestCase): + """Unit tests for FileEntry.merge_entries without Avro I/O.""" + + def _create_file_meta(self, file_name, level=0): + from pypaimon.data.timestamp import Timestamp + return DataFileMeta( + file_name=file_name, file_size=1024, row_count=100, + min_key=GenericRow([], []), max_key=GenericRow([], []), + key_stats=SimpleStats.empty_stats(), value_stats=SimpleStats.empty_stats(), + min_sequence_number=1, max_sequence_number=100, schema_id=0, + level=level, extra_files=[], + creation_time=Timestamp.from_epoch_millis(0), delete_row_count=0, + embedded_index=None, file_source=None, value_stats_cols=None, + external_path=None, first_row_id=None, write_cols=None + ) + + def _entry(self, kind, partition_values, bucket, file_name, level=0): + from pypaimon.schema.data_types import DataField, AtomicType + fields = [DataField(0, 'pt', AtomicType('STRING'))] if partition_values else [] + partition = GenericRow(partition_values, fields) + return ManifestEntry( + kind=kind, partition=partition, bucket=bucket, total_buckets=1, + file=self._create_file_meta(file_name, level=level) + ) + + def test_different_partitions_not_matched(self): + """Same file_name but different partitions should not cancel.""" + from pypaimon.manifest.schema.file_entry import FileEntry + entries = [ + self._entry(0, ['p1'], 0, 'data.parquet'), + self._entry(1, ['p2'], 0, 'data.parquet'), + ] + result = FileEntry.merge_entries(entries) + self.assertEqual(len(result), 2) + + def test_different_buckets_not_matched(self): + """Same file_name but different buckets should not cancel.""" + from pypaimon.manifest.schema.file_entry import FileEntry + entries = [ + self._entry(0, [], 0, 'data.parquet'), + self._entry(1, [], 1, 'data.parquet'), + ] + result = FileEntry.merge_entries(entries) + self.assertEqual(len(result), 2) + + def test_duplicate_add_raises_error(self): + """Adding the same file twice should raise RuntimeError.""" + from pypaimon.manifest.schema.file_entry import FileEntry + entries = [ + self._entry(0, [], 0, 'data.parquet'), + self._entry(0, [], 0, 'data.parquet'), + ] + with self.assertRaises(RuntimeError): + FileEntry.merge_entries(entries) + + def test_multi_partition_merge(self): + """Multiple partitions with interleaved ADD/DELETE.""" + from pypaimon.manifest.schema.file_entry import FileEntry + entries = [ + self._entry(0, ['p1'], 0, 'a.parquet'), + self._entry(0, ['p2'], 0, 'b.parquet'), + self._entry(1, ['p1'], 0, 'a.parquet'), # cancels p1/a + self._entry(0, ['p1'], 0, 'c.parquet'), + ] + result = FileEntry.merge_entries(entries) + self.assertEqual(len(result), 2) + names = {e.file.file_name for e in result} + self.assertEqual(names, {'b.parquet', 'c.parquet'}) if __name__ == '__main__':