[lake/tiering] add table dropped handling and nullable write results#2920
[lake/tiering] add table dropped handling and nullable write results#2920beryllw wants to merge 6 commits intoapache:mainfrom
Conversation
c74237d to
ef27034
Compare
ef27034 to
2e0a904
Compare
leekeiabstraction
left a comment
There was a problem hiding this comment.
TY for the PR, left a comment.
...flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java
Show resolved
Hide resolved
luoyuxia
left a comment
There was a problem hiding this comment.
Haven't finish review, but left some comments
...uss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
Show resolved
Hide resolved
...uss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
Outdated
Show resolved
Hide resolved
- Removed redundant TableNotExistException catch block that re-threw the same exception - Consolidated exception handling to reduce code duplication - Maintained same error propagation behavior for all other exceptions - Improved code readability by removing unnecessary exception wrapping
| * Completes the writing process and returns the write result. | ||
| * | ||
| * @return the write result | ||
| * @return the write result, or null if no data was written (empty write scenario) |
There was a problem hiding this comment.
I'm curious about when it'll be null(not data written)
| // Check if the error indicates table doesn't exist or tiering epoch is fenced | ||
| // (which happens when table is dropped and recreated) | ||
| if (errors == Errors.TABLE_NOT_EXIST | ||
| || errors == Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION |
There was a problem hiding this comment.
not to handle UNKNOWN_TABLE_OR_BUCKET_EXCEPTION and FENCED_TIERING_EPOCH_EXCEPTION
| @VisibleForTesting | ||
| protected void handleTableDropped(long tableId) { | ||
| // Remove from tiering table epochs | ||
| Long tieringEpoch = tieringTableEpochs.remove(tableId); |
There was a problem hiding this comment.
I feel like it's error-prone in here to remove from tieringTableEpochs, and put failedTableEpochs.
Another way is consider table drop as another fail tiering event (send FailedTieringEvent), so that the whole path is unified.
| ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); | ||
|
|
||
| private static final int CURRENT_VERSION = 1; | ||
| private static final int CURRENT_VERSION = 2; |
There was a problem hiding this comment.
Acutally, we may not need to bump the version since it's only needed for shuffle... For code clean, we can still keep it as CURRENT_VERSION = 1
|
|
||
| @Override | ||
| public void wakeUp() { | ||
| // do nothing |
There was a problem hiding this comment.
please still keep the comment
| } | ||
| lakeWriters.clear(); | ||
|
|
||
| // Also handle pending snapshot splits for this table |
There was a problem hiding this comment.
we don't need to handle pending snapshot splits again since it must be in currentTableSplitsByBucket. See method
private void addSplitToCurrentTable(TieringSplit split) {
this.currentTableSplitsByBucket.put(split.getTableBucket(), split);
if (split.isTieringSnapshotSplit()) {
this.currentPendingSnapshotSplits.add((TieringSnapshotSplit) split);
} else if (split.isTieringLogSplit()) {
subscribeLog((TieringLogSplit) split);
}
}
| } | ||
| } | ||
|
|
||
| // Close any remaining lake writers that don't have corresponding splits |
There was a problem hiding this comment.
do we really need it?
When it happen that we have lake writer but no corresponding splits?
| } catch (Exception e) { | ||
| throw new IOException("Failed to complete Paimon write.", e); | ||
| } | ||
| if (commitMessage == null) { |
There was a problem hiding this comment.
iceberg and lance also need to do same handle?
| currentTableSplitsByBucket.put(split.getTableBucket(), split); | ||
| } | ||
| LOG.info( | ||
| "Skipping RPC for dropped table {} (path: {}), will force complete in next fetch cycle.", |
There was a problem hiding this comment.
nit:
| "Skipping RPC for dropped table {} (path: {}), will force complete in next fetch cycle.", | |
| Table {} is already marked dropped; skip opening table {} and force complete in next fetch cycle. |
Skipping RPC looks strange to me
Purpose
Linked issue: close #2498
This PR improves Lake Tiering's handling of dropped tables and adds cancellation support for tiering operations.
Brief change log
Dropped Table Handling
in-progress splits with empty results instead of failing
Cancellation Support
Tests
API and Format
Documentation