From 63fd05e7e66faac158aeb5dfb88f215ab9ba92e8 Mon Sep 17 00:00:00 2001 From: jkukreja Date: Wed, 1 Apr 2026 13:52:28 -0400 Subject: [PATCH] [flink] fix namespace for lineage for table path to warehouse path --- .../apache/paimon/flink/lineage/LineageUtils.java | 12 ++++++------ .../paimon/flink/lineage/LineageUtilsTest.java | 14 +++++++++----- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java index 110365c76ee4..ab4b77993386 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.lineage; import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.table.Table; import org.apache.flink.api.connector.source.Boundedness; @@ -38,8 +39,6 @@ */ public class LineageUtils { - private static final String PAIMON_DATASET_PREFIX = "paimon://"; - private static final Set PAIMON_OPTION_KEYS = CoreOptions.getOptions().stream().map(opt -> opt.key()).collect(Collectors.toSet()); @@ -49,6 +48,7 @@ public class LineageUtils { */ private static Map buildConfigMap(Table table) { Map config = new HashMap<>(); + config.put("type", "paimon"); config.put("partition-keys", String.join(",", table.partitionKeys())); config.put("primary-keys", String.join(",", table.primaryKeys())); @@ -60,12 +60,12 @@ private static Map buildConfigMap(Table table) { } /** - * Returns the lineage namespace for a Paimon table. The namespace uses the {@code paimon://} - * scheme followed by the table's physical warehouse path, e.g. {@code - * "paimon://s3://my-bucket/warehouse/mydb.db/mytable"}. + * Returns the lineage namespace for a Paimon table. The namespace is the warehouse path derived + * via {@link CatalogUtils#warehouse(String)}, e.g. {@code "s3://my-bucket/warehouse"} for + * object stores or {@code "file:/tmp/warehouse"} for local paths. */ public static String getNamespace(Table table) { - return PAIMON_DATASET_PREFIX + CoreOptions.path(table.options()); + return CatalogUtils.warehouse(CoreOptions.path(table.options()).toString()); } /** diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java index 62d601ec1b23..44c95cec8e06 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java @@ -54,11 +54,14 @@ class LineageUtilsTest { @TempDir java.nio.file.Path temp; + private Path warehouse; private Path tablePath; @BeforeEach void setUp() { - tablePath = new Path(temp.toUri().toString()); + // mirror real Paimon layout: /.db/ + warehouse = new Path(temp.toUri().toString()); + tablePath = new Path(warehouse, "test_db.db/test_table"); } private FileStoreTable createTable( @@ -85,8 +88,8 @@ void testGetNamespace() throws Exception { String namespace = LineageUtils.getNamespace(table); - assertThat(namespace).startsWith("paimon://"); - assertThat(namespace).contains(tablePath.toString()); + // namespace is the warehouse root (2 levels up from the table path) + assertThat(namespace).isEqualTo("file:" + warehouse.toUri().getPath()); } @Test @@ -102,7 +105,7 @@ void testSourceLineageVertexBounded() throws Exception { LineageDataset dataset = vertex.datasets().get(0); assertThat(dataset.name()).isEqualTo("paimon.db.src"); - assertThat(dataset.namespace()).startsWith("paimon://"); + assertThat(dataset.namespace()).isEqualTo("file:" + warehouse.toUri().getPath()); } @Test @@ -128,7 +131,7 @@ void testSinkLineageVertex() throws Exception { LineageDataset dataset = vertex.datasets().get(0); assertThat(dataset.name()).isEqualTo("paimon.db.sink"); - assertThat(dataset.namespace()).startsWith("paimon://"); + assertThat(dataset.namespace()).isEqualTo("file:" + warehouse.toUri().getPath()); } @Test @@ -144,6 +147,7 @@ void testConfigFacetContainsPartitionAndPrimaryKeys() throws Exception { DatasetConfigFacet configFacet = (DatasetConfigFacet) facets.get("config"); Map config = configFacet.config(); + assertThat(config).containsEntry("type", "paimon"); assertThat(config).containsEntry("partition-keys", "f2"); assertThat(config).containsEntry("primary-keys", "f0,f2"); }