Skip to content

[lake/tiering] add table dropped handling and nullable write results#2920

Open
beryllw wants to merge 6 commits intoapache:mainfrom
beryllw:tiering-droptbl
Open

[lake/tiering] add table dropped handling and nullable write results#2920
beryllw wants to merge 6 commits intoapache:mainfrom
beryllw:tiering-droptbl

Conversation

@beryllw
Copy link
Copy Markdown
Contributor

@beryllw beryllw commented Mar 24, 2026

Purpose

Linked issue: close #2498

This PR improves Lake Tiering's handling of dropped tables and adds cancellation support for tiering operations.

Brief change log

Dropped Table Handling

  • When a table is dropped during active tiering, the TieringSplitReader now gracefully completes all
    in-progress splits with empty results instead of failing
  • Added handleTableDropped() to mark dropped tables and trigger force completion
  • Dropped tables are properly cleaned up from LakeTableTieringManager to prevent resource leaks

Cancellation Support

  • Added cancelled flag to TableBucketWriteResult to distinguish between normal and cancelled tiering rounds
  • Committers skip commit processing for cancelled results and report back to the coordinator
  • Lake writers are closed without completing when tiering is cancelled, discarding uncommitted data

Tests

API and Format

Documentation

@beryllw beryllw force-pushed the tiering-droptbl branch 3 times, most recently from c74237d to ef27034 Compare March 25, 2026 05:04
Copy link
Copy Markdown
Contributor

@leekeiabstraction leekeiabstraction left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TY for the PR, left a comment.

Copy link
Copy Markdown
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't finish review, but left some comments

beryllw added 3 commits March 30, 2026 22:46
- Removed redundant TableNotExistException catch block that re-threw the same exception
- Consolidated exception handling to reduce code duplication
- Maintained same error propagation behavior for all other exceptions
- Improved code readability by removing unnecessary exception wrapping
@beryllw beryllw requested a review from luoyuxia March 31, 2026 02:13
Copy link
Copy Markdown
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beryllw Thanks for the pr. Left some comments. PTAL

* Completes the writing process and returns the write result.
*
* @return the write result
* @return the write result, or null if no data was written (empty write scenario)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about when it'll be null(not data written)

// Check if the error indicates table doesn't exist or tiering epoch is fenced
// (which happens when table is dropped and recreated)
if (errors == Errors.TABLE_NOT_EXIST
|| errors == Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not to handle UNKNOWN_TABLE_OR_BUCKET_EXCEPTION and FENCED_TIERING_EPOCH_EXCEPTION

@VisibleForTesting
protected void handleTableDropped(long tableId) {
// Remove from tiering table epochs
Long tieringEpoch = tieringTableEpochs.remove(tableId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like it's error-prone in here to remove from tieringTableEpochs, and put failedTableEpochs.
Another way is consider table drop as another fail tiering event (send FailedTieringEvent), so that the whole path is unified.

ThreadLocal.withInitial(() -> new DataOutputSerializer(64));

private static final int CURRENT_VERSION = 1;
private static final int CURRENT_VERSION = 2;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acutally, we may not need to bump the version since it's only needed for shuffle... For code clean, we can still keep it as CURRENT_VERSION = 1


@Override
public void wakeUp() {
// do nothing
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please still keep the comment

}
lakeWriters.clear();

// Also handle pending snapshot splits for this table
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to handle pending snapshot splits again since it must be in currentTableSplitsByBucket. See method

private void addSplitToCurrentTable(TieringSplit split) {
        this.currentTableSplitsByBucket.put(split.getTableBucket(), split);
        if (split.isTieringSnapshotSplit()) {
            this.currentPendingSnapshotSplits.add((TieringSnapshotSplit) split);
        } else if (split.isTieringLogSplit()) {
            subscribeLog((TieringLogSplit) split);
        }
    }

}
}

// Close any remaining lake writers that don't have corresponding splits
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need it?
When it happen that we have lake writer but no corresponding splits?

} catch (Exception e) {
throw new IOException("Failed to complete Paimon write.", e);
}
if (commitMessage == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iceberg and lance also need to do same handle?

currentTableSplitsByBucket.put(split.getTableBucket(), split);
}
LOG.info(
"Skipping RPC for dropped table {} (path: {}), will force complete in next fetch cycle.",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
"Skipping RPC for dropped table {} (path: {}), will force complete in next fetch cycle.",
Table {} is already marked dropped; skip opening table {} and force complete in next fetch cycle.

Skipping RPC looks strange to me

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

TieringService will stuck when drop the table that is tiering

3 participants