diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java index 2990054999..99ea53cf06 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java @@ -44,6 +44,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; @@ -103,6 +104,14 @@ public static Cluster sendMetadataRequestAndRebuildCluster( MetadataRequest metadataRequest = ClientRpcMessageUtils.makeMetadataRequest( tablePaths, tablePartitions, tablePartitionIds); + // Collect the table paths for which partition metadata is being refreshed, so that stale + // partition entries can be removed during partial updates. + final Set refreshedPartitionTables = new HashSet<>(); + if (tablePartitions != null) { + for (PhysicalTablePath ptp : tablePartitions) { + refreshedPartitionTables.add(ptp.getTablePath()); + } + } return gateway.metadata(metadataRequest) .thenApply( response -> { @@ -135,6 +144,28 @@ public static Cluster sendMetadataRequestAndRebuildCluster( newPartitionIdByPath = new HashMap<>(originCluster.getPartitionIdByPath()); + // Remove stale partition entries for tables whose partition + // metadata was refreshed. The response only contains currently + // existing partitions, so any entry not in the response is stale + // (e.g., dropped partitions). + if (!refreshedPartitionTables.isEmpty()) { + newPartitionIdByPath + .keySet() + .removeIf( + path -> + refreshedPartitionTables.contains( + path.getTablePath())); + newBucketLocations + .keySet() + .removeIf( + path -> + path.getPartitionName() != null + && refreshedPartitionTables + .contains( + path + .getTablePath())); + } + newTablePathToTableId.putAll(newTableMetadata.tablePathToTableId); newBucketLocations.putAll(newTableMetadata.bucketLocations); newPartitionIdByPath.putAll(newTableMetadata.partitionIdByPath); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java index 80f9b75d17..f9a9d25b3a 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java @@ -113,6 +113,18 @@ public final class RecordAccumulator { private final ConcurrentMap writeBatches = new CopyOnWriteMap<>(); + /** + * Paths that have been marked stale (e.g. the partition was dropped). New appends to these + * paths are rejected. The Sender thread removes an entry from {@link #writeBatches} once all + * its deques are drained. + * + *

All accesses must be guarded by {@link #staleLock}. + */ + private final Set stalePaths = new HashSet<>(); + + /** Guards {@link #stalePaths} and the remove-if-empty logic in {@link #writeBatches}. */ + private final Object staleLock = new Object(); + private final IncompleteBatches incomplete; private final Map nodesDrainIndex; @@ -185,6 +197,17 @@ public RecordAppendResult append( // The metadata may return null for the partition id, but it is fine to pass null here, // because we will fill the partitionId in bucketReady() before send the batch. Optional partitionIdOpt = cluster.getPartitionId(physicalTablePath); + + // Reject appends to paths that the Sender has marked as stale (e.g. dropped partitions). + // This check must happen before computeIfAbsent so that a concurrent markPathsAsStale + + // removeStalePathIfEmpty cannot race with a new append re-inserting the same path. + synchronized (staleLock) { + if (stalePaths.contains(physicalTablePath)) { + throw new IllegalStateException( + "Cannot append to a stale (dropped) partition: " + physicalTablePath); + } + } + BucketAndWriteBatches bucketAndWriteBatches = writeBatches.computeIfAbsent( physicalTablePath, @@ -428,6 +451,50 @@ public Set getPhysicalTablePathsInBatches() { return writeBatches.keySet(); } + /** + * Mark the given physical table paths as stale. Subsequent {@link #append} calls for these + * paths will be rejected. The Sender should call {@link #removeStalePathIfEmpty} once a path's + * deques have been fully drained. + * + * @param paths the paths to mark as stale (e.g. dropped partitions) + */ + public void markPathsAsStale(Set paths) { + synchronized (staleLock) { + stalePaths.addAll(paths); + } + } + + /** + * Remove a stale path from {@link #writeBatches} if and only if all its deques are empty. + * + *

This is safe to call from the Sender thread. The {@link #staleLock} prevents a concurrent + * {@link #append} from sneaking in between the emptiness check and the remove. + * + * @param path the stale path to try to remove + * @return {@code true} if the path was removed, {@code false} if it still had pending data + */ + public boolean removeStalePathIfEmpty(PhysicalTablePath path) { + synchronized (staleLock) { + BucketAndWriteBatches entry = writeBatches.get(path); + if (entry == null) { + stalePaths.remove(path); + return true; + } + for (Deque deque : entry.batches.values()) { + synchronized (deque) { + if (!deque.isEmpty()) { + return false; + } + } + } + // All deques are empty under staleLock: no concurrent append can insert new data + // because append() checks stalePaths while holding staleLock before computeIfAbsent. + writeBatches.remove(path); + stalePaths.remove(path); + return true; + } + } + private List allocateMemorySegments( WriteRecord writeRecord, PhysicalTablePath physicalTablePath) throws IOException { int pagesPerBatch = diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java index 7b456d4bd9..9ce66c0320 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java @@ -231,6 +231,11 @@ private void sendWriteData() throws Exception { readyCheckResult.unknownLeaderTables); } + // Clean up stale physical table path entries from the accumulator on every cycle. + // Dropped partitions will no longer appear in the cluster's bucket locations; any + // writeBatches entry whose deques are all empty can be safely removed. + cleanupStaleWriteBatches(metadataUpdater.getCluster()); + Set readyNodes = readyCheckResult.readyNodes; if (readyNodes.isEmpty()) { // TODO The method sendWriteData is in a busy loop. If there is no data continuously, it @@ -256,6 +261,37 @@ private void sendWriteData() throws Exception { } } + /** + * Mark physical table paths that no longer appear in the cluster as stale, then try to remove + * any stale path whose deques have been fully drained. + * + *

Marking prevents new appends to dropped partitions. Removal is deferred until all + * in-flight batches for a path have been drained, so no data is silently lost. + */ + private void cleanupStaleWriteBatches(Cluster cluster) { + Set clusterPaths = cluster.getBucketLocationsByPath().keySet(); + Set newStalePaths = new HashSet<>(); + for (PhysicalTablePath path : accumulator.getPhysicalTablePathsInBatches()) { + if (!clusterPaths.contains(path)) { + newStalePaths.add(path); + } + } + if (!newStalePaths.isEmpty()) { + LOG.debug( + "Marking {} stale physical table path(s) from write batches: {}", + newStalePaths.size(), + newStalePaths); + accumulator.markPathsAsStale(newStalePaths); + } + + // Try to remove every path that has been marked stale and is now fully drained. + for (PhysicalTablePath path : accumulator.getPhysicalTablePathsInBatches()) { + if (!clusterPaths.contains(path)) { + accumulator.removeStalePathIfEmpty(path); + } + } + } + private void completeBatch(ReadyWriteBatch readyWriteBatch) { if (idempotenceManager.idempotenceEnabled()) { idempotenceManager.handleCompletedBatch(readyWriteBatch); diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java index 0e4ab9c97f..2dfdfb0ae4 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java @@ -574,6 +574,34 @@ private Cluster updateCluster(List bucketLocations) { Collections.emptyMap()); } + /** + * Create a cluster that contains all the given bucket locations for DATA1 table plus an extra + * physical table path with its single bucket. Used to test stale-path cleanup. + */ + private Cluster updateClusterWithExtra( + List bucketLocations, + PhysicalTablePath extraPath, + BucketLocation extraBucket) { + Map aliveTabletServersById = new HashMap<>(); + aliveTabletServersById.put(node1.id(), node1); + aliveTabletServersById.put(node2.id(), node2); + aliveTabletServersById.put(node3.id(), node3); + + Map> bucketsByPath = new HashMap<>(); + bucketsByPath.put(DATA1_PHYSICAL_TABLE_PATH, bucketLocations); + bucketsByPath.put(extraPath, Collections.singletonList(extraBucket)); + + Map tableIdByPath = new HashMap<>(); + tableIdByPath.put(DATA1_TABLE_PATH, DATA1_TABLE_ID); + tableIdByPath.put(extraPath.getTablePath(), DATA1_TABLE_ID); + return new Cluster( + aliveTabletServersById, + new ServerNode(0, "localhost", 89, ServerType.COORDINATOR), + bucketsByPath, + tableIdByPath, + Collections.emptyMap()); + } + private void delayedInterrupt(final Thread thread, final long delayMs) { Thread t = new Thread( @@ -602,6 +630,87 @@ private void verifyTableBucketInBatches( assertThat(tableBucketsInBatch).containsExactlyInAnyOrder(tb); } + @Test + void testMarkAndRemoveStalePathAfterDrain() throws Exception { + IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"}); + // batchTimeoutMs=0 so batches are immediately ready and drain() polls them out. + RecordAccumulator accum = createTestRecordAccumulator(0, 4 * 1024, 256, 64 * 1024); + + // Create a stale path representing a dropped partition. + PhysicalTablePath stalePath = + PhysicalTablePath.of(TablePath.of("test_db", "test_table"), "stale_partition"); + BucketLocation staleBucket = + new BucketLocation(stalePath, DATA1_TABLE_ID, 0, node1.id(), serverNodes); + Cluster clusterWithStale = + updateClusterWithExtra(Arrays.asList(bucket1, bucket2), stalePath, staleBucket); + + accum.append(createRecord(row), writeCallback, cluster, 0, false); + accum.append( + WriteRecord.forIndexedAppend( + DATA1_TABLE_INFO, + stalePath, + indexedRow(DATA1_ROW_TYPE, new Object[] {2, "b"}), + null), + writeCallback, + clusterWithStale, + 0, + false); + + assertThat(accum.getPhysicalTablePathsInBatches()).contains(DATA1_PHYSICAL_TABLE_PATH); + assertThat(accum.getPhysicalTablePathsInBatches()).contains(stalePath); + + // Mark stale: subsequent appends to stalePath must be rejected. + accum.markPathsAsStale(Collections.singleton(stalePath)); + assertThatThrownBy( + () -> + accum.append( + WriteRecord.forIndexedAppend( + DATA1_TABLE_INFO, + stalePath, + indexedRow(DATA1_ROW_TYPE, new Object[] {3, "c"}), + null), + writeCallback, + clusterWithStale, + 0, + false)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("stale"); + + // Drain and deallocate — deque becomes empty. + Map> drained = + accum.drain(clusterWithStale, Collections.singleton(node1.id()), Integer.MAX_VALUE); + for (List batches : drained.values()) { + for (ReadyWriteBatch b : batches) { + accum.deallocate(b.writeBatch()); + } + } + + // Now the deque is empty: removeStalePathIfEmpty must succeed. + boolean removed = accum.removeStalePathIfEmpty(stalePath); + assertThat(removed).isTrue(); + assertThat(accum.getPhysicalTablePathsInBatches()).doesNotContain(stalePath); + // The live path still has un-drained data and must not be removed. + assertThat(accum.getPhysicalTablePathsInBatches()).contains(DATA1_PHYSICAL_TABLE_PATH); + } + + @Test + void testRemoveStalePathReturnsFalseWhenDequeNonEmpty() throws Exception { + IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"}); + RecordAccumulator accum = createTestRecordAccumulator(4 * 1024, 64 * 1024); + + // Append a record but do NOT drain — deque is non-empty. + accum.append(createRecord(row), writeCallback, cluster, 0, false); + assertThat(accum.getPhysicalTablePathsInBatches()).contains(DATA1_PHYSICAL_TABLE_PATH); + + // Mark stale and attempt removal while there is still pending data. + accum.markPathsAsStale(Collections.singleton(DATA1_PHYSICAL_TABLE_PATH)); + boolean removed = accum.removeStalePathIfEmpty(DATA1_PHYSICAL_TABLE_PATH); + + assertThat(removed).isFalse(); + // Path must still be present because data is pending. + assertThat(accum.getPhysicalTablePathsInBatches()).contains(DATA1_PHYSICAL_TABLE_PATH); + } + @Test void testDrainContinuesWhenBucketAtMaxInflight() throws Exception { int batchSize = 1024; diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java index 2c4d30e96a..8f27fff5ed 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java @@ -19,6 +19,7 @@ import org.apache.fluss.client.metadata.TestingMetadataUpdater; import org.apache.fluss.client.metrics.TestingWriterMetricGroup; +import org.apache.fluss.cluster.BucketLocation; import org.apache.fluss.cluster.Cluster; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.config.ConfigOptions; @@ -801,6 +802,110 @@ void testSequenceNumberIncrement() throws Exception { assertThat(future1.get()).isNull(); } + /** + * Tests that Sender.runOnce() cleans up stale physical table path entries from the + * RecordAccumulator after a metadata update reveals that a partition no longer exists in the + * cluster. This prevents unbounded growth of writeBatches for long-running partition-write jobs + * where partitions are continuously created and dropped. + */ + @Test + void testSenderCleansUpStaleWriteBatchesAfterMetadataUpdate() throws Exception { + // Use a non-partitioned stale path so that ready() / drain() work without needing + // a partition ID in the cluster's partitionsIdByPath map. + PhysicalTablePath stalePath = PhysicalTablePath.of(TablePath.of("test_db", "stale_table")); + long staleTableId = 99901L; + ServerNode leader = TestingMetadataUpdater.NODE1; + int[] replicas = new int[] {leader.id()}; + BucketLocation staleBucket = + new BucketLocation(stalePath, staleTableId, 0, leader.id(), replicas); + + // Build a cluster that includes the normal path AND the soon-to-be-dropped stale path. + Cluster clusterWithStale = + new Cluster( + metadataUpdater.getCluster().getAliveTabletServers(), + metadataUpdater.getCluster().getCoordinatorServer(), + addExtraPath( + metadataUpdater.getCluster().getBucketLocationsByPath(), + stalePath, + staleBucket), + addExtraTableId( + metadataUpdater.getCluster().getTableIdByPath(), + stalePath.getTablePath(), + staleTableId), + metadataUpdater.getCluster().getPartitionIdByPath()); + metadataUpdater.updateCluster(clusterWithStale); + + // Create a minimal TableInfo for the stale table so we can append to it. + TableInfo staleTableInfo = + TableInfo.of( + stalePath.getTablePath(), + staleTableId, + 1, + DATA1_TABLE_INFO.toTableDescriptor(), + null, + System.currentTimeMillis(), + System.currentTimeMillis()); + + // Append a record to the stale path — this registers it in writeBatches. + accumulator.append( + WriteRecord.forArrowAppend(staleTableInfo, stalePath, row(1, "a"), null), + (tb, leo, e) -> {}, + clusterWithStale, + 0, + false); + + assertThat(accumulator.getPhysicalTablePathsInBatches()).contains(stalePath); + + // Drain and deallocate to empty the stale deque. batchTimeoutMs=0 so the batch is + // immediately ready and drain() will poll it out. + RecordAccumulator.ReadyCheckResult ready = accumulator.ready(clusterWithStale); + Map> batches = + accumulator.drain(clusterWithStale, ready.readyNodes, MAX_REQUEST_SIZE); + for (List batchList : batches.values()) { + for (ReadyWriteBatch b : batchList) { + accumulator.deallocate(b.writeBatch()); + } + } + + // Simulate partition drop: update the cluster without the stale path. + Map> bucketsWithoutStale = + new HashMap<>(metadataUpdater.getCluster().getBucketLocationsByPath()); + bucketsWithoutStale.remove(stalePath); + Map tableIdsWithoutStale = + new HashMap<>(metadataUpdater.getCluster().getTableIdByPath()); + tableIdsWithoutStale.remove(stalePath.getTablePath()); + Cluster clusterWithoutStale = + new Cluster( + metadataUpdater.getCluster().getAliveTabletServers(), + metadataUpdater.getCluster().getCoordinatorServer(), + bucketsWithoutStale, + tableIdsWithoutStale, + metadataUpdater.getCluster().getPartitionIdByPath()); + metadataUpdater.updateCluster(clusterWithoutStale); + + // runOnce() calls cleanupStaleWriteBatches: marks the path stale, then removes it + // because the deque is already empty. + sender.runOnce(); + + assertThat(accumulator.getPhysicalTablePathsInBatches()).doesNotContain(stalePath); + } + + private static Map> addExtraPath( + Map> original, + PhysicalTablePath extraPath, + BucketLocation extraBucket) { + Map> result = new HashMap<>(original); + result.put(extraPath, Collections.singletonList(extraBucket)); + return result; + } + + private static Map addExtraTableId( + Map original, TablePath tablePath, long tableId) { + Map result = new HashMap<>(original); + result.put(tablePath, tableId); + return result; + } + @Test void testSendWhenDestinationIsNullInMetadata() throws Exception { long offset = 0;