Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
93d77d3
Implementation
smaheshwar-pltr Apr 19, 2026
41da8bd
Merge upstream/main into sm/replace-table
smaheshwar-pltr May 18, 2026
e649f95
Extend replace_table to all catalogs, harden semantics
smaheshwar-pltr May 18, 2026
8675256
Test RTAS: replace_table_transaction with atomic write
smaheshwar-pltr May 18, 2026
fd7e11f
Address review feedback (see PR description for full details)
smaheshwar-pltr May 18, 2026
b4d76c1
Address review feedback (round 2)
smaheshwar-pltr May 18, 2026
1efa6fe
Use manual Schema for plain replace_table, df.schema for RTAS
smaheshwar-pltr May 18, 2026
a813a92
Make replace_table_transaction @abstractmethod on Catalog
smaheshwar-pltr May 18, 2026
5f1e0dc
Reframe ReplaceTableTransaction.commit_transaction docstring
smaheshwar-pltr May 18, 2026
bafa06b
Trim ReplaceTableTransaction.commit_transaction docstring
smaheshwar-pltr May 18, 2026
1e6af57
Nits
smaheshwar-pltr May 18, 2026
14a6e03
Drop made-up property example from RTAS docs snippet
smaheshwar-pltr May 18, 2026
eaba3ea
Drop stray '# Replace table tests' divider in test_catalog_behaviors
smaheshwar-pltr May 18, 2026
a620054
Drop extra blank line left by divider removal
smaheshwar-pltr May 18, 2026
0847d20
Apply test review findings
smaheshwar-pltr May 18, 2026
4278b6a
Add niche but no-regret tests from audit follow-up
smaheshwar-pltr May 18, 2026
08c5b2d
Strip format-version from persisted properties on replace
smaheshwar-pltr May 18, 2026
feff7dd
Test coverage: persisted props, list/map fresh ids, sort-order reuse
smaheshwar-pltr May 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,39 @@ with catalog.create_table_transaction(identifier="docs_example.bids", schema=sch
txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
```

## Replace a table

Atomically replace an existing table's schema, partition spec, sort order, location, and properties. The table UUID and history (snapshots, schemas, specs, sort orders, metadata log) are preserved; the current snapshot is cleared (the `main` branch ref is removed). `replace_table` redefines the table in this way; `replace_table_transaction` lets you write new data alongside this change to permit RTAS (replace-table-as-select) workflows.

```python
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, LongType, StringType, BooleanType

new_schema = Schema(
NestedField(field_id=1, name="datetime", field_type=LongType(), required=False),
NestedField(field_id=2, name="symbol", field_type=StringType(), required=False),
NestedField(field_id=3, name="active", field_type=BooleanType(), required=False),
)
catalog.replace_table(identifier="docs_example.bids", schema=new_schema)
```

Field IDs from columns whose names appear in the previous schema are reused, so existing data files remain readable when the new schema is a compatible superset. New columns get fresh IDs above `last-column-id`.

Properties passed to `replace_table` are **merged** with the existing table properties (your values override; existing keys you don't pass are preserved). To remove a property as part of the replace, use `replace_table_transaction` and remove it explicitly within the transaction.

Use `replace_table_transaction` to stage additional changes (writes, property updates, schema evolution) before committing — for example, swap the schema and write new data atomically:

```python
with catalog.replace_table_transaction(identifier="docs_example.bids", schema=df.schema) as txn:
txn.append(df)
```

To upgrade the table's format version as part of the replace, pass `format-version` in `properties`:

```python
catalog.replace_table(identifier="docs_example.bids", schema=new_schema, properties={"format-version": "2"})
```

## Register a table

To register a table using existing metadata:
Expand Down
162 changes: 159 additions & 3 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,25 @@
)
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.manifest import ManifestFile
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.partitioning import (
UNPARTITIONED_PARTITION_SPEC,
PartitionSpec,
assign_fresh_partition_spec_ids_for_replace,
)
from pyiceberg.schema import Schema, assign_fresh_schema_ids_for_replace
from pyiceberg.serializers import ToOutputFile
from pyiceberg.table import (
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE,
CommitTableResponse,
CreateTableTransaction,
ReplaceTableTransaction,
StagedTable,
Table,
TableProperties,
)
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
from pyiceberg.table.update import (
TableRequirement,
TableUpdate,
Expand Down Expand Up @@ -444,6 +449,135 @@ def create_table_if_not_exists(
except TableAlreadyExistsError:
return self.load_table(identifier)

def replace_table(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default impl is just composition — every concrete catalog only needs to override replace_table_transaction. Mirrors how create_table_if_not_exists is structured on the base.

self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> Table:
"""Atomically replace a table's schema, spec, sort order, location, and properties.

The table UUID and history (snapshots, schemas, specs, sort orders) are preserved.
The current snapshot is cleared (main branch ref is removed).

Args:
identifier (str | Identifier): Table identifier.
schema (Schema): New table schema.
location (str | None): New table location. Defaults to the existing location.
partition_spec (PartitionSpec): New partition spec.
sort_order (SortOrder): New sort order.
properties (Properties): Properties to apply. Merged on top of the existing
table properties: keys present here override existing values; existing keys
not present here are preserved. To remove a property, follow up with a
transaction that removes it explicitly.

Returns:
Table: the replaced table instance.

Raises:
NoSuchTableError: If the table does not exist.
"""
return self.replace_table_transaction(
identifier, schema, location, partition_spec, sort_order, properties
).commit_transaction()

@abstractmethod
def replace_table_transaction(
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> ReplaceTableTransaction:
"""Create a ReplaceTableTransaction.

The transaction can be used to stage additional changes (schema evolution,
partition evolution, etc.) before committing.

Args:
identifier (str | Identifier): Table identifier.
schema (Schema): New table schema.
location (str | None): New table location. Defaults to the existing location.
partition_spec (PartitionSpec): New partition spec.
sort_order (SortOrder): New sort order.
properties (Properties): Properties to apply. Merged on top of the existing
table properties: keys present here override existing values; existing keys
not present here are preserved. To remove a property, follow up with a
transaction that removes it explicitly.

Returns:
ReplaceTableTransaction: A transaction for the replace operation.

Raises:
NoSuchTableError: If the table does not exist.
"""

def _replace_staged_table(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maps to Java's TableMetadata.buildReplacement. All the bookkeeping (fresh schema, partition spec, sort order, location resolution, StagedTable construction) lives here so MetastoreCatalog and RestCatalog share it — analogous to how _create_staged_table is factored.

self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
location: str | None,
partition_spec: PartitionSpec,
sort_order: SortOrder,
properties: Properties,
) -> tuple[StagedTable, Schema, PartitionSpec, SortOrder, str]:
"""Load the existing table and build fresh schema/spec/sort-order for replacement.

- reuses existing field IDs by name (from the current schema)
- reuses partition field IDs by `(source, transform)` across all specs (v2+),
or carries forward the current spec with `VoidTransform`s (v1)
- reassigns sort field IDs against the fresh schema
- resolves `location` to the existing table's location when omitted

Returns:
A tuple `(staged_table, fresh_schema, fresh_partition_spec, fresh_sort_order, resolved_location)`.
"""
existing_table = self.load_table(identifier)
existing_metadata = existing_table.metadata

requested_format_version = properties.get(TableProperties.FORMAT_VERSION)
if requested_format_version is not None and int(requested_format_version) < existing_metadata.format_version:
raise ValueError(
f"Cannot downgrade format-version from {existing_metadata.format_version} to {requested_format_version}"
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java's buildReplacement reads format-version from properties and only upgrades. Rejecting downgrade explicitly here — otherwise _convert_schema_if_needed would run with v1 semantics while the actual upgrade silently drops, producing a confusing mismatch.

)
resolved_format_version = (
int(requested_format_version) if requested_format_version is not None else existing_metadata.format_version
)
iceberg_schema = self._convert_schema_if_needed(schema, cast(TableVersion, resolved_format_version))
iceberg_schema.check_format_version_compatibility(cast(TableVersion, resolved_format_version))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same call new_table_metadata makes (metadata.py:597), and the same check Java's Builder runs inside addSchemaInternal. Catches v1-incompatible types up front rather than failing later inside AddSchemaUpdate's apply path.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same call new_table_metadata makes (metadata.py:597), and the same check Java's Builder runs inside addSchemaInternal. Catches v1-incompatible types up front rather than failing later inside AddSchemaUpdate's apply path.


fresh_schema, _ = assign_fresh_schema_ids_for_replace(
iceberg_schema, existing_metadata.schema(), existing_metadata.last_column_id
)

fresh_partition_spec, _ = assign_fresh_partition_spec_ids_for_replace(
partition_spec,
iceberg_schema,
fresh_schema,
existing_metadata.partition_specs,
existing_metadata.last_partition_id,
format_version=existing_metadata.format_version,
current_spec=existing_metadata.spec(),
)

fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema)

resolved_location = location.rstrip("/") if location else existing_metadata.location

staged_table = StagedTable(
identifier=existing_table.name(),
metadata=existing_metadata,
metadata_location=existing_table.metadata_location,
io=existing_table.io,
catalog=self,
)
return staged_table, fresh_schema, fresh_partition_spec, fresh_sort_order, resolved_location

@abstractmethod
def load_table(self, identifier: str | Identifier) -> Table:
"""Load the table's metadata and returns the table instance.
Expand Down Expand Up @@ -924,6 +1058,28 @@ def create_table_transaction(
self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties)
)

@override
def replace_table_transaction(
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> ReplaceTableTransaction:
staged_table, fresh_schema, fresh_spec, fresh_sort_order, resolved_location = self._replace_staged_table(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heads-up on a deliberate scope cut: Java's RESTSessionCatalog.replaceTransaction does a view-existence pre-flight check before replacing. PyIceberg doesn't currently do this in the equivalent create_table / rename_table / register_table paths either, so adding it only to replace_table here would be inconsistent. Splitting that into a follow-up PR that ports the check across all the relevant call sites in one pass.

identifier, schema, location, partition_spec, sort_order, properties
)
return ReplaceTableTransaction(
table=staged_table,
new_schema=fresh_schema,
new_spec=fresh_spec,
new_sort_order=fresh_sort_order,
new_location=resolved_location,
new_properties=properties,
)

@override
def table_exists(self, identifier: str | Identifier) -> bool:
try:
Expand Down
13 changes: 13 additions & 0 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from pyiceberg.table import (
CommitTableResponse,
CreateTableTransaction,
ReplaceTableTransaction,
Table,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -68,6 +69,18 @@ def create_table_transaction(
) -> CreateTableTransaction:
raise NotImplementedError

@override
def replace_table_transaction(
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> ReplaceTableTransaction:
raise NotImplementedError

@override
def load_table(self, identifier: str | Identifier) -> Table:
raise NotImplementedError
Expand Down
30 changes: 29 additions & 1 deletion pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,18 @@
FileIO,
load_file_io,
)
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids
from pyiceberg.partitioning import (
UNPARTITIONED_PARTITION_SPEC,
PartitionSpec,
assign_fresh_partition_spec_ids,
)
from pyiceberg.schema import Schema, assign_fresh_schema_ids
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CreateTableTransaction,
FileScanTask,
ReplaceTableTransaction,
StagedTable,
Table,
TableIdentifier,
Expand Down Expand Up @@ -953,6 +958,29 @@ def create_table_transaction(
staged_table = self._response_to_staged_table(self.identifier_to_tuple(identifier), table_response)
return CreateTableTransaction(staged_table)

@override
@retry(**_RETRY_ARGS)
def replace_table_transaction(
self,
identifier: str | Identifier,
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> ReplaceTableTransaction:
staged_table, fresh_schema, fresh_spec, fresh_sort_order, resolved_location = self._replace_staged_table(
identifier, schema, location, partition_spec, sort_order, properties
)
return ReplaceTableTransaction(
table=staged_table,
new_schema=fresh_schema,
new_spec=fresh_spec,
new_sort_order=fresh_sort_order,
new_location=resolved_location,
new_properties=properties,
)

@override
@retry(**_RETRY_ARGS)
def create_view(
Expand Down
Loading
Loading