Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 5 additions & 24 deletions paimon-python/pypaimon/manifest/manifest_file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from datetime import datetime

from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.file_entry import FileEntry
from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA,
ManifestEntry)
from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
Expand Down Expand Up @@ -53,33 +54,13 @@ def read_entries_parallel(self, manifest_files: List[ManifestFileMeta], manifest
def _process_single_manifest(manifest_file: ManifestFileMeta) -> List[ManifestEntry]:
return self.read(manifest_file.file_name, manifest_entry_filter, drop_stats)

def _entry_identifier(e: ManifestEntry) -> tuple:
return (
tuple(e.partition.values),
e.bucket,
e.file.level,
e.file.file_name,
tuple(e.file.extra_files) if e.file.extra_files else (),
e.file.embedded_index,
e.file.external_path,
)

deleted_entry_keys = set()
added_entries = []
all_entries = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_results = executor.map(_process_single_manifest, manifest_files)
for entries in future_results:
for entry in entries:
if entry.kind == 0: # ADD
added_entries.append(entry)
else: # DELETE
deleted_entry_keys.add(_entry_identifier(entry))

final_entries = [
entry for entry in added_entries
if _entry_identifier(entry) not in deleted_entry_keys
]
return final_entries
all_entries.extend(entries)

return FileEntry.merge_entries(all_entries)

def read(self, manifest_file_name: str, manifest_entry_filter=None, drop_stats=True) -> List[ManifestEntry]:
manifest_file_path = f"{self.manifest_path}/{manifest_file_name}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,10 @@ def test_add_delete_different_levels(self):
final_entries = self.manifest_file_manager.read_entries_parallel(
[manifest_file_1, manifest_file_2])

# Different levels -> different identifiers -> should NOT match
self.assertEqual(len(final_entries), 1, "Different levels should NOT match")
self.assertEqual(final_entries[0].file.level, 0, "ADD entry with level=0 should remain")
# Different levels -> different identifiers -> both preserved
self.assertEqual(len(final_entries), 2, "Different levels should NOT match")
kinds = {e.kind for e in final_entries}
self.assertEqual(kinds, {0, 1}, "Both ADD and DELETE should remain")

def test_add_delete_different_extra_files(self):
"""
Expand Down Expand Up @@ -250,9 +251,147 @@ def test_add_delete_different_extra_files(self):
final_entries = self.manifest_file_manager.read_entries_parallel(
[manifest_file_1, manifest_file_2])

# Different extra_files -> different identifiers -> should NOT match
self.assertEqual(len(final_entries), 1, "Different extra_files should NOT match")
self.assertEqual(final_entries[0].file.extra_files, ["index.idx"])
# Different extra_files -> different identifiers -> both preserved
self.assertEqual(len(final_entries), 2, "Different extra_files should NOT match")
kinds = {e.kind for e in final_entries}
self.assertEqual(kinds, {0, 1}, "Both ADD and DELETE should remain")

def test_delete_then_add_same_identifier(self):
"""DELETE followed by ADD of the same identifier (compaction scenario).

- Manifest 1: ADD(file_A)
- Manifest 2: DELETE(file_A), ADD(file_A) (re-added after compaction)
- Expected: file_A exists (the re-ADD survives)
- Old bug: global DELETE set filtered ALL matching ADDs regardless of order.
"""
partition = GenericRow([], [])

add_entry_v1 = ManifestEntry(
kind=0, partition=partition, bucket=0, total_buckets=1,
file=self._create_file_meta("data-1.parquet", level=0)
)
delete_entry = ManifestEntry(
kind=1, partition=partition, bucket=0, total_buckets=1,
file=self._create_file_meta("data-1.parquet", level=0)
)
add_entry_v2 = ManifestEntry(
kind=0, partition=partition, bucket=0, total_buckets=1,
file=self._create_file_meta("data-1.parquet", level=0)
)

manifest_file_1 = ManifestFileMeta(
file_name="manifest-1.avro", file_size=1024,
num_added_files=1, num_deleted_files=0,
partition_stats=SimpleStats.empty_stats(), schema_id=0
)
manifest_file_2 = ManifestFileMeta(
file_name="manifest-2.avro", file_size=1024,
num_added_files=1, num_deleted_files=1,
partition_stats=SimpleStats.empty_stats(), schema_id=0
)

self.manifest_file_manager.write(manifest_file_1.file_name, [add_entry_v1])
self.manifest_file_manager.write(manifest_file_2.file_name, [delete_entry, add_entry_v2])

final_entries = self.manifest_file_manager.read_entries_parallel(
[manifest_file_1, manifest_file_2])

# ADD(v1) cancelled by DELETE, then ADD(v2) re-added -> 1 entry
self.assertEqual(len(final_entries), 1,
"Re-added file after DELETE should survive")
self.assertEqual(final_entries[0].kind, 0, "Surviving entry should be ADD")

def test_unmatched_delete_preserved(self):
"""A DELETE without a matching ADD should be preserved in the output."""
partition = GenericRow([], [])

delete_entry = ManifestEntry(
kind=1, partition=partition, bucket=0, total_buckets=1,
file=self._create_file_meta("data-orphan.parquet", level=0)
)

manifest_file = ManifestFileMeta(
file_name="manifest-1.avro", file_size=1024,
num_added_files=0, num_deleted_files=1,
partition_stats=SimpleStats.empty_stats(), schema_id=0
)

self.manifest_file_manager.write(manifest_file.file_name, [delete_entry])

final_entries = self.manifest_file_manager.read_entries_parallel([manifest_file])

self.assertEqual(len(final_entries), 1, "Unmatched DELETE should be preserved")
self.assertEqual(final_entries[0].kind, 1)


class MergeEntriesUnitTest(unittest.TestCase):
"""Unit tests for FileEntry.merge_entries without Avro I/O."""

def _create_file_meta(self, file_name, level=0):
from pypaimon.data.timestamp import Timestamp
return DataFileMeta(
file_name=file_name, file_size=1024, row_count=100,
min_key=GenericRow([], []), max_key=GenericRow([], []),
key_stats=SimpleStats.empty_stats(), value_stats=SimpleStats.empty_stats(),
min_sequence_number=1, max_sequence_number=100, schema_id=0,
level=level, extra_files=[],
creation_time=Timestamp.from_epoch_millis(0), delete_row_count=0,
embedded_index=None, file_source=None, value_stats_cols=None,
external_path=None, first_row_id=None, write_cols=None
)

def _entry(self, kind, partition_values, bucket, file_name, level=0):
from pypaimon.schema.data_types import DataField, AtomicType
fields = [DataField(0, 'pt', AtomicType('STRING'))] if partition_values else []
partition = GenericRow(partition_values, fields)
return ManifestEntry(
kind=kind, partition=partition, bucket=bucket, total_buckets=1,
file=self._create_file_meta(file_name, level=level)
)

def test_different_partitions_not_matched(self):
"""Same file_name but different partitions should not cancel."""
from pypaimon.manifest.schema.file_entry import FileEntry
entries = [
self._entry(0, ['p1'], 0, 'data.parquet'),
self._entry(1, ['p2'], 0, 'data.parquet'),
]
result = FileEntry.merge_entries(entries)
self.assertEqual(len(result), 2)

def test_different_buckets_not_matched(self):
"""Same file_name but different buckets should not cancel."""
from pypaimon.manifest.schema.file_entry import FileEntry
entries = [
self._entry(0, [], 0, 'data.parquet'),
self._entry(1, [], 1, 'data.parquet'),
]
result = FileEntry.merge_entries(entries)
self.assertEqual(len(result), 2)

def test_duplicate_add_raises_error(self):
"""Adding the same file twice should raise RuntimeError."""
from pypaimon.manifest.schema.file_entry import FileEntry
entries = [
self._entry(0, [], 0, 'data.parquet'),
self._entry(0, [], 0, 'data.parquet'),
]
with self.assertRaises(RuntimeError):
FileEntry.merge_entries(entries)

def test_multi_partition_merge(self):
"""Multiple partitions with interleaved ADD/DELETE."""
from pypaimon.manifest.schema.file_entry import FileEntry
entries = [
self._entry(0, ['p1'], 0, 'a.parquet'),
self._entry(0, ['p2'], 0, 'b.parquet'),
self._entry(1, ['p1'], 0, 'a.parquet'), # cancels p1/a
self._entry(0, ['p1'], 0, 'c.parquet'),
]
result = FileEntry.merge_entries(entries)
self.assertEqual(len(result), 2)
names = {e.file.file_name for e in result}
self.assertEqual(names, {'b.parquet', 'c.parquet'})


if __name__ == '__main__':
Expand Down
Loading