Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.lineage;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.table.Table;

import org.apache.flink.api.connector.source.Boundedness;
Expand All @@ -38,8 +39,6 @@
*/
public class LineageUtils {

private static final String PAIMON_DATASET_PREFIX = "paimon://";

private static final Set<String> PAIMON_OPTION_KEYS =
CoreOptions.getOptions().stream().map(opt -> opt.key()).collect(Collectors.toSet());

Expand All @@ -49,6 +48,7 @@ public class LineageUtils {
*/
private static Map<String, String> buildConfigMap(Table table) {
Map<String, String> config = new HashMap<>();
config.put("type", "paimon");
config.put("partition-keys", String.join(",", table.partitionKeys()));
config.put("primary-keys", String.join(",", table.primaryKeys()));

Expand All @@ -60,12 +60,12 @@ private static Map<String, String> buildConfigMap(Table table) {
}

/**
* Returns the lineage namespace for a Paimon table. The namespace uses the {@code paimon://}
* scheme followed by the table's physical warehouse path, e.g. {@code
* "paimon://s3://my-bucket/warehouse/mydb.db/mytable"}.
* Returns the lineage namespace for a Paimon table. The namespace is the warehouse path derived
* via {@link CatalogUtils#warehouse(String)}, e.g. {@code "s3://my-bucket/warehouse"} for
* object stores or {@code "file:/tmp/warehouse"} for local paths.
*/
public static String getNamespace(Table table) {
return PAIMON_DATASET_PREFIX + CoreOptions.path(table.options());
return CatalogUtils.warehouse(CoreOptions.path(table.options()).toString());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@ class LineageUtilsTest {

@TempDir java.nio.file.Path temp;

private Path warehouse;
private Path tablePath;

@BeforeEach
void setUp() {
tablePath = new Path(temp.toUri().toString());
// mirror real Paimon layout: <warehouse>/<database>.db/<table>
warehouse = new Path(temp.toUri().toString());
tablePath = new Path(warehouse, "test_db.db/test_table");
}

private FileStoreTable createTable(
Expand All @@ -85,8 +88,8 @@ void testGetNamespace() throws Exception {

String namespace = LineageUtils.getNamespace(table);

assertThat(namespace).startsWith("paimon://");
assertThat(namespace).contains(tablePath.toString());
// namespace is the warehouse root (2 levels up from the table path)
assertThat(namespace).isEqualTo("file:" + warehouse.toUri().getPath());
}

@Test
Expand All @@ -102,7 +105,7 @@ void testSourceLineageVertexBounded() throws Exception {

LineageDataset dataset = vertex.datasets().get(0);
assertThat(dataset.name()).isEqualTo("paimon.db.src");
assertThat(dataset.namespace()).startsWith("paimon://");
assertThat(dataset.namespace()).isEqualTo("file:" + warehouse.toUri().getPath());
}

@Test
Expand All @@ -128,7 +131,7 @@ void testSinkLineageVertex() throws Exception {

LineageDataset dataset = vertex.datasets().get(0);
assertThat(dataset.name()).isEqualTo("paimon.db.sink");
assertThat(dataset.namespace()).startsWith("paimon://");
assertThat(dataset.namespace()).isEqualTo("file:" + warehouse.toUri().getPath());
}

@Test
Expand All @@ -144,6 +147,7 @@ void testConfigFacetContainsPartitionAndPrimaryKeys() throws Exception {

DatasetConfigFacet configFacet = (DatasetConfigFacet) facets.get("config");
Map<String, String> config = configFacet.config();
assertThat(config).containsEntry("type", "paimon");
assertThat(config).containsEntry("partition-keys", "f2");
assertThat(config).containsEntry("primary-keys", "f0,f2");
}
Expand Down