-
Notifications
You must be signed in to change notification settings - Fork 491
Support replace_table and replace_table_transaction
#3220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
93d77d3
41da8bd
e649f95
8675256
fd7e11f
b4d76c1
1efa6fe
a813a92
5f1e0dc
bafa06b
1e6af57
14a6e03
eaba3ea
a620054
0847d20
4278b6a
08c5b2d
feff7dd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,20 +42,25 @@ | |
| ) | ||
| from pyiceberg.io import FileIO, load_file_io | ||
| from pyiceberg.manifest import ManifestFile | ||
| from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec | ||
| from pyiceberg.schema import Schema | ||
| from pyiceberg.partitioning import ( | ||
| UNPARTITIONED_PARTITION_SPEC, | ||
| PartitionSpec, | ||
| assign_fresh_partition_spec_ids_for_replace, | ||
| ) | ||
| from pyiceberg.schema import Schema, assign_fresh_schema_ids_for_replace | ||
| from pyiceberg.serializers import ToOutputFile | ||
| from pyiceberg.table import ( | ||
| DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, | ||
| CommitTableResponse, | ||
| CreateTableTransaction, | ||
| ReplaceTableTransaction, | ||
| StagedTable, | ||
| Table, | ||
| TableProperties, | ||
| ) | ||
| from pyiceberg.table.locations import load_location_provider | ||
| from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata | ||
| from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder | ||
| from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids | ||
| from pyiceberg.table.update import ( | ||
| TableRequirement, | ||
| TableUpdate, | ||
|
|
@@ -444,6 +449,135 @@ def create_table_if_not_exists( | |
| except TableAlreadyExistsError: | ||
| return self.load_table(identifier) | ||
|
|
||
| def replace_table( | ||
| self, | ||
| identifier: str | Identifier, | ||
| schema: Schema | pa.Schema, | ||
| location: str | None = None, | ||
| partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, | ||
| sort_order: SortOrder = UNSORTED_SORT_ORDER, | ||
| properties: Properties = EMPTY_DICT, | ||
| ) -> Table: | ||
| """Atomically replace a table's schema, spec, sort order, location, and properties. | ||
|
|
||
| The table UUID and history (snapshots, schemas, specs, sort orders) are preserved. | ||
| The current snapshot is cleared (main branch ref is removed). | ||
|
|
||
| Args: | ||
| identifier (str | Identifier): Table identifier. | ||
| schema (Schema): New table schema. | ||
| location (str | None): New table location. Defaults to the existing location. | ||
| partition_spec (PartitionSpec): New partition spec. | ||
| sort_order (SortOrder): New sort order. | ||
| properties (Properties): Properties to apply. Merged on top of the existing | ||
| table properties: keys present here override existing values; existing keys | ||
| not present here are preserved. To remove a property, follow up with a | ||
| transaction that removes it explicitly. | ||
|
|
||
| Returns: | ||
| Table: the replaced table instance. | ||
|
|
||
| Raises: | ||
| NoSuchTableError: If the table does not exist. | ||
| """ | ||
| return self.replace_table_transaction( | ||
| identifier, schema, location, partition_spec, sort_order, properties | ||
| ).commit_transaction() | ||
|
|
||
| @abstractmethod | ||
| def replace_table_transaction( | ||
| self, | ||
| identifier: str | Identifier, | ||
| schema: Schema | pa.Schema, | ||
| location: str | None = None, | ||
| partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, | ||
| sort_order: SortOrder = UNSORTED_SORT_ORDER, | ||
| properties: Properties = EMPTY_DICT, | ||
| ) -> ReplaceTableTransaction: | ||
| """Create a ReplaceTableTransaction. | ||
|
|
||
| The transaction can be used to stage additional changes (schema evolution, | ||
| partition evolution, etc.) before committing. | ||
|
|
||
| Args: | ||
| identifier (str | Identifier): Table identifier. | ||
| schema (Schema): New table schema. | ||
| location (str | None): New table location. Defaults to the existing location. | ||
| partition_spec (PartitionSpec): New partition spec. | ||
| sort_order (SortOrder): New sort order. | ||
| properties (Properties): Properties to apply. Merged on top of the existing | ||
| table properties: keys present here override existing values; existing keys | ||
| not present here are preserved. To remove a property, follow up with a | ||
| transaction that removes it explicitly. | ||
|
|
||
| Returns: | ||
| ReplaceTableTransaction: A transaction for the replace operation. | ||
|
|
||
| Raises: | ||
| NoSuchTableError: If the table does not exist. | ||
| """ | ||
|
|
||
| def _replace_staged_table( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maps to Java's |
||
| self, | ||
| identifier: str | Identifier, | ||
| schema: Schema | pa.Schema, | ||
| location: str | None, | ||
| partition_spec: PartitionSpec, | ||
| sort_order: SortOrder, | ||
| properties: Properties, | ||
| ) -> tuple[StagedTable, Schema, PartitionSpec, SortOrder, str]: | ||
| """Load the existing table and build fresh schema/spec/sort-order for replacement. | ||
|
|
||
| - reuses existing field IDs by name (from the current schema) | ||
| - reuses partition field IDs by `(source, transform)` across all specs (v2+), | ||
| or carries forward the current spec with `VoidTransform`s (v1) | ||
| - reassigns sort field IDs against the fresh schema | ||
| - resolves `location` to the existing table's location when omitted | ||
|
|
||
| Returns: | ||
| A tuple `(staged_table, fresh_schema, fresh_partition_spec, fresh_sort_order, resolved_location)`. | ||
| """ | ||
| existing_table = self.load_table(identifier) | ||
| existing_metadata = existing_table.metadata | ||
|
|
||
| requested_format_version = properties.get(TableProperties.FORMAT_VERSION) | ||
| if requested_format_version is not None and int(requested_format_version) < existing_metadata.format_version: | ||
| raise ValueError( | ||
| f"Cannot downgrade format-version from {existing_metadata.format_version} to {requested_format_version}" | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Java's |
||
| ) | ||
| resolved_format_version = ( | ||
| int(requested_format_version) if requested_format_version is not None else existing_metadata.format_version | ||
| ) | ||
| iceberg_schema = self._convert_schema_if_needed(schema, cast(TableVersion, resolved_format_version)) | ||
| iceberg_schema.check_format_version_compatibility(cast(TableVersion, resolved_format_version)) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same call
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same call |
||
|
|
||
| fresh_schema, _ = assign_fresh_schema_ids_for_replace( | ||
| iceberg_schema, existing_metadata.schema(), existing_metadata.last_column_id | ||
| ) | ||
|
|
||
| fresh_partition_spec, _ = assign_fresh_partition_spec_ids_for_replace( | ||
| partition_spec, | ||
| iceberg_schema, | ||
| fresh_schema, | ||
| existing_metadata.partition_specs, | ||
| existing_metadata.last_partition_id, | ||
| format_version=existing_metadata.format_version, | ||
| current_spec=existing_metadata.spec(), | ||
| ) | ||
|
|
||
| fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema) | ||
|
|
||
| resolved_location = location.rstrip("/") if location else existing_metadata.location | ||
|
|
||
| staged_table = StagedTable( | ||
| identifier=existing_table.name(), | ||
| metadata=existing_metadata, | ||
| metadata_location=existing_table.metadata_location, | ||
| io=existing_table.io, | ||
| catalog=self, | ||
| ) | ||
| return staged_table, fresh_schema, fresh_partition_spec, fresh_sort_order, resolved_location | ||
|
|
||
| @abstractmethod | ||
| def load_table(self, identifier: str | Identifier) -> Table: | ||
| """Load the table's metadata and returns the table instance. | ||
|
|
@@ -924,6 +1058,28 @@ def create_table_transaction( | |
| self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties) | ||
| ) | ||
|
|
||
| @override | ||
| def replace_table_transaction( | ||
| self, | ||
| identifier: str | Identifier, | ||
| schema: Schema | pa.Schema, | ||
| location: str | None = None, | ||
| partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, | ||
| sort_order: SortOrder = UNSORTED_SORT_ORDER, | ||
| properties: Properties = EMPTY_DICT, | ||
| ) -> ReplaceTableTransaction: | ||
| staged_table, fresh_schema, fresh_spec, fresh_sort_order, resolved_location = self._replace_staged_table( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Heads-up on a deliberate scope cut: Java's |
||
| identifier, schema, location, partition_spec, sort_order, properties | ||
| ) | ||
| return ReplaceTableTransaction( | ||
| table=staged_table, | ||
| new_schema=fresh_schema, | ||
| new_spec=fresh_spec, | ||
| new_sort_order=fresh_sort_order, | ||
| new_location=resolved_location, | ||
| new_properties=properties, | ||
| ) | ||
|
|
||
| @override | ||
| def table_exists(self, identifier: str | Identifier) -> bool: | ||
| try: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default impl is just composition — every concrete catalog only needs to override
replace_table_transaction. Mirrors howcreate_table_if_not_existsis structured on the base.