-
Notifications
You must be signed in to change notification settings - Fork 519
[lake/tiering] add table dropped handling and nullable write results #2920
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
827ea64
2e0a904
ca316be
1c15b83
011aa0d
ef52bd9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.fluss.flink.tiering.event; | ||
|
|
||
| import org.apache.flink.api.connector.source.SourceEvent; | ||
|
|
||
| import java.util.Objects; | ||
|
|
||
| /** | ||
| * SourceEvent used to notify TieringSourceReader that a table has been dropped or recreated, and | ||
| * all pending splits for this table should be skipped. | ||
| */ | ||
| public class TieringTableDroppedEvent implements SourceEvent { | ||
|
|
||
| private static final long serialVersionUID = 1L; | ||
|
|
||
| private final long tableId; | ||
|
|
||
| public TieringTableDroppedEvent(long tableId) { | ||
| this.tableId = tableId; | ||
| } | ||
|
|
||
| public long getTableId() { | ||
| return tableId; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
| if (this == o) { | ||
| return true; | ||
| } | ||
| if (!(o instanceof TieringTableDroppedEvent)) { | ||
| return false; | ||
| } | ||
| TieringTableDroppedEvent that = (TieringTableDroppedEvent) o; | ||
| return tableId == that.tableId; | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hashCode(tableId); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "TieringTableDroppedEvent{" + "tableId=" + tableId + '}'; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,7 +33,10 @@ public class TableBucketWriteResultSerializer<WriteResult> | |
| private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE = | ||
| ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); | ||
|
|
||
| private static final int CURRENT_VERSION = 1; | ||
| private static final int CURRENT_VERSION = 2; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Acutally, we may not need to bump the version since it's only needed for shuffle... For code clean, we can still keep it as |
||
|
|
||
| // Version 1: original format without the cancelled flag | ||
| private static final int VERSION_1 = 1; | ||
|
|
||
| private final org.apache.fluss.lake.serializer.SimpleVersionedSerializer<WriteResult> | ||
| writeResultSerializer; | ||
|
|
@@ -91,6 +94,9 @@ public byte[] serialize(TableBucketWriteResult<WriteResult> tableBucketWriteResu | |
| // serialize number of write results | ||
| out.writeInt(tableBucketWriteResult.numberOfWriteResults()); | ||
|
|
||
| // serialize cancelled flag | ||
| out.writeBoolean(tableBucketWriteResult.isCancelled()); | ||
|
|
||
| final byte[] result = out.getCopyOfBuffer(); | ||
| out.clear(); | ||
| return result; | ||
|
|
@@ -99,7 +105,7 @@ public byte[] serialize(TableBucketWriteResult<WriteResult> tableBucketWriteResu | |
| @Override | ||
| public TableBucketWriteResult<WriteResult> deserialize(int version, byte[] serialized) | ||
| throws IOException { | ||
| if (version != CURRENT_VERSION) { | ||
| if (version != CURRENT_VERSION && version != VERSION_1) { | ||
| throw new IOException("Unknown version " + version); | ||
| } | ||
| final DataInputDeserializer in = new DataInputDeserializer(serialized); | ||
|
|
@@ -125,7 +131,9 @@ public TableBucketWriteResult<WriteResult> deserialize(int version, byte[] seria | |
| if (writeResultLength >= 0) { | ||
| byte[] writeResultBytes = new byte[writeResultLength]; | ||
| in.readFully(writeResultBytes); | ||
| writeResult = writeResultSerializer.deserialize(version, writeResultBytes); | ||
| writeResult = | ||
| writeResultSerializer.deserialize( | ||
| writeResultSerializer.getVersion(), writeResultBytes); | ||
| } else { | ||
| writeResult = null; | ||
| } | ||
|
|
@@ -136,13 +144,18 @@ public TableBucketWriteResult<WriteResult> deserialize(int version, byte[] seria | |
| long maxTimestamp = in.readLong(); | ||
| // deserialize number of write results | ||
| int numberOfWriteResults = in.readInt(); | ||
|
|
||
| // deserialize cancelled flag (added in version 2; default to false for version 1) | ||
| boolean cancelled = (version >= CURRENT_VERSION) && in.readBoolean(); | ||
|
|
||
| return new TableBucketWriteResult<>( | ||
| tablePath, | ||
| tableBucket, | ||
| partitionName, | ||
| writeResult, | ||
| logEndOffset, | ||
| maxTimestamp, | ||
| numberOfWriteResults); | ||
| numberOfWriteResults, | ||
| cancelled); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -55,41 +55,61 @@ public TieringSourceFetcherManager( | |
| } | ||
|
|
||
| public void markTableReachTieringMaxDuration(long tableId) { | ||
| LOG.info("Enqueueing handleTableReachTieringMaxDuration task for table {}", tableId); | ||
| enqueueTaskForTable( | ||
| tableId, | ||
| reader -> { | ||
| LOG.debug( | ||
| "Executing handleTableReachTieringMaxDuration in split reader for table {}", | ||
| tableId); | ||
| reader.handleTableReachTieringMaxDuration(tableId); | ||
| }, | ||
| "handleTableReachTieringMaxDuration"); | ||
| } | ||
|
|
||
| public void markTableDropped(long tableId) { | ||
| LOG.info("Enqueueing handleTableDropped task for table {}", tableId); | ||
| enqueueTaskForTable( | ||
| tableId, | ||
| reader -> { | ||
| LOG.debug("Executing handleTableDropped in split reader for table {}", tableId); | ||
| reader.handleTableDropped(tableId); | ||
| }, | ||
| "handleTableDropped"); | ||
| } | ||
|
|
||
| private void enqueueTaskForTable( | ||
| long tableId, Consumer<TieringSplitReader<WriteResult>> action, String actionDesc) { | ||
| SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher; | ||
| if (!fetchers.isEmpty()) { | ||
| // The fetcher thread is still running. This should be the majority of the cases. | ||
| LOG.info("fetchers is not empty, marking tiering max duration for table {}", tableId); | ||
| fetchers.values() | ||
| .forEach( | ||
| splitFetcher -> | ||
| enqueueMarkTableReachTieringMaxDurationTask( | ||
| splitFetcher, tableId)); | ||
| LOG.info("Fetchers are active, enqueueing {} task for table {}", actionDesc, tableId); | ||
| fetchers.values().forEach(f -> enqueueReaderTask(f, action)); | ||
| } else { | ||
| SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher = | ||
| createSplitFetcher(); | ||
| LOG.info( | ||
| "fetchers is empty, enqueue marking tiering max duration for table {}", | ||
| "No active fetchers, creating new fetcher and enqueueing {} task for table {}", | ||
| actionDesc, | ||
| tableId); | ||
| enqueueMarkTableReachTieringMaxDurationTask(splitFetcher, tableId); | ||
| splitFetcher = createSplitFetcher(); | ||
| enqueueReaderTask(splitFetcher, action); | ||
| startFetcher(splitFetcher); | ||
| } | ||
| } | ||
|
|
||
| private void enqueueMarkTableReachTieringMaxDurationTask( | ||
| @SuppressWarnings("unchecked") | ||
| private void enqueueReaderTask( | ||
| SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> splitFetcher, | ||
| long reachTieringDeadlineTable) { | ||
| Consumer<TieringSplitReader<WriteResult>> action) { | ||
| splitFetcher.enqueueTask( | ||
| new SplitFetcherTask() { | ||
| @Override | ||
| public boolean run() { | ||
| ((TieringSplitReader<WriteResult>) splitFetcher.getSplitReader()) | ||
| .handleTableReachTieringMaxDuration(reachTieringDeadlineTable); | ||
| action.accept( | ||
| (TieringSplitReader<WriteResult>) splitFetcher.getSplitReader()); | ||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
| public void wakeUp() { | ||
| // do nothing | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please still keep the comment |
||
| } | ||
| public void wakeUp() {} | ||
| }); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious about when it'll be null(not data written)