Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions crates/integration_tests/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1799,6 +1799,70 @@ async fn test_time_travel_by_tag_name() {
);
}

#[tokio::test]
async fn test_time_travel_conflicting_selectors_fail() {
let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "time_travel_table").await;

let conflicted = table.copy_with_options(HashMap::from([
("scan.tag-name".to_string(), "snapshot1".to_string()),
("scan.snapshot-id".to_string(), "2".to_string()),
]));

let plan_err = conflicted
.new_read_builder()
.new_scan()
.plan()
.await
.expect_err("conflicting time-travel selectors should fail");

match plan_err {
Error::DataInvalid { message, .. } => {
assert!(
message.contains("Only one time-travel selector may be set"),
"unexpected conflict error: {message}"
);
assert!(
message.contains("scan.snapshot-id"),
"conflict error should mention scan.snapshot-id: {message}"
);
assert!(
message.contains("scan.tag-name"),
"conflict error should mention scan.tag-name: {message}"
);
}
other => panic!("unexpected error: {other:?}"),
}
}

#[tokio::test]
async fn test_time_travel_invalid_numeric_selector_fails() {
let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog, "time_travel_table").await;

let invalid = table.copy_with_options(HashMap::from([(
"scan.snapshot-id".to_string(),
"not-a-number".to_string(),
)]));

let plan_err = invalid
.new_read_builder()
.new_scan()
.plan()
.await
.expect_err("invalid numeric time-travel selector should fail");

match plan_err {
Error::DataInvalid { message, .. } => {
assert!(
message.contains("Invalid value for scan.snapshot-id"),
"unexpected invalid selector error: {message}"
);
}
other => panic!("unexpected error: {other:?}"),
}
}

// ---------------------------------------------------------------------------
// Data evolution + drop column tests
// ---------------------------------------------------------------------------
Expand Down
81 changes: 81 additions & 0 deletions crates/integrations/datafusion/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::sync::Arc;

use datafusion::arrow::array::{Array, Int32Array, StringArray};
Expand Down Expand Up @@ -57,6 +58,21 @@ async fn create_provider(table_name: &str) -> PaimonTableProvider {
PaimonTableProvider::try_new(table).expect("Failed to create table provider")
}

async fn create_provider_with_options(
table_name: &str,
extra_options: HashMap<String, String>,
) -> PaimonTableProvider {
let catalog = create_catalog();
let identifier = Identifier::new("default", table_name);
let table = catalog
.get_table(&identifier)
.await
.expect("Failed to get table")
.copy_with_options(extra_options);

PaimonTableProvider::try_new(table).expect("Failed to create table provider")
}

async fn read_rows(table_name: &str) -> Vec<(i32, String)> {
let batches = collect_query(table_name, &format!("SELECT id, name FROM {table_name}"))
.await
Expand Down Expand Up @@ -469,6 +485,71 @@ async fn test_time_travel_by_tag_name() {
);
}

#[tokio::test]
async fn test_time_travel_conflicting_selectors_fail() {
let provider = create_provider_with_options(
"time_travel_table",
HashMap::from([("scan.tag-name".to_string(), "snapshot1".to_string())]),
)
.await;

let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect", "BigQuery");
let ctx = SessionContext::new_with_config(config);
ctx.register_table("time_travel_table", Arc::new(provider))
.expect("Failed to register table");
ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new()))
.expect("Failed to register relation planner");

let err = ctx
.sql("SELECT id, name FROM time_travel_table FOR SYSTEM_TIME AS OF 2")
.await
.expect("time travel query should parse")
.collect()
.await
.expect_err("conflicting time-travel selectors should fail");

let message = err.to_string();
assert!(
message.contains("Only one time-travel selector may be set"),
"unexpected conflict error: {message}"
);
assert!(
message.contains("scan.snapshot-id"),
"conflict error should mention scan.snapshot-id: {message}"
);
assert!(
message.contains("scan.tag-name"),
"conflict error should mention scan.tag-name: {message}"
);
}

#[tokio::test]
async fn test_time_travel_invalid_numeric_selector_fails() {
let provider = create_provider_with_options(
"time_travel_table",
HashMap::from([("scan.snapshot-id".to_string(), "not-a-number".to_string())]),
)
.await;

let ctx = SessionContext::new();
ctx.register_table("time_travel_table", Arc::new(provider))
.expect("Failed to register table");

let err = ctx
.sql("SELECT id, name FROM time_travel_table")
.await
.expect("query should parse")
.collect()
.await
.expect_err("invalid numeric time-travel selector should fail");

let message = err.to_string();
assert!(
message.contains("Invalid value for scan.snapshot-id"),
"unexpected invalid selector error: {message}"
);
}

/// Verifies that data evolution merge correctly NULL-fills columns that no file in a
/// merge group provides (e.g. a newly added column after MERGE INTO on old rows).
/// Without the fix, `active_file_indices` would be empty and rows would be silently lost.
Expand Down
162 changes: 158 additions & 4 deletions crates/paimon/src/spec/core_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ pub struct CoreOptions<'a> {
options: &'a HashMap<String, String>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum TimeTravelSelector<'a> {
TagName(&'a str),
SnapshotId(i64),
TimestampMillis(i64),
}

impl<'a> CoreOptions<'a> {
pub fn new(options: &'a HashMap<String, String>) -> Self {
Self { options }
Expand Down Expand Up @@ -94,25 +101,90 @@ impl<'a> CoreOptions<'a> {
.unwrap_or(true)
}

/// Snapshot id for time travel via `scan.snapshot-id`.
fn parse_i64_option(&self, option_name: &'static str) -> crate::Result<Option<i64>> {
match self.options.get(option_name) {
Some(value) => value
.parse::<i64>()
.map(Some)
.map_err(|e| crate::Error::DataInvalid {
message: format!("Invalid value for {option_name}: '{value}'"),
source: Some(Box::new(e)),
}),
None => Ok(None),
}
}

/// Raw snapshot id accessor for `scan.snapshot-id`.
///
/// This compatibility accessor is lossy: it returns `None` for absent or
/// invalid values and does not validate selector conflicts. Internal
/// time-travel planning should use `try_time_travel_selector`.
pub fn scan_snapshot_id(&self) -> Option<i64> {
self.options
.get(SCAN_SNAPSHOT_ID_OPTION)
.and_then(|v| v.parse().ok())
}

/// Timestamp in millis for time travel via `scan.timestamp-millis`.
/// Raw timestamp accessor for `scan.timestamp-millis`.
///
/// This compatibility accessor is lossy: it returns `None` for absent or
/// invalid values and does not validate selector conflicts. Internal
/// time-travel planning should use `try_time_travel_selector`.
pub fn scan_timestamp_millis(&self) -> Option<i64> {
self.options
.get(SCAN_TIMESTAMP_MILLIS_OPTION)
.and_then(|v| v.parse().ok())
}

/// Tag name for time travel via `scan.tag-name`.
pub fn scan_tag_name(&self) -> Option<&str> {
/// Raw tag name accessor for `scan.tag-name`.
///
/// This compatibility accessor does not validate selector conflicts.
/// Internal time-travel planning should use `try_time_travel_selector`.
pub fn scan_tag_name(&self) -> Option<&'a str> {
self.options.get(SCAN_TAG_NAME_OPTION).map(String::as_str)
}

fn configured_time_travel_selectors(&self) -> Vec<&'static str> {
let mut selectors = Vec::with_capacity(3);
if self.options.contains_key(SCAN_TAG_NAME_OPTION) {
selectors.push(SCAN_TAG_NAME_OPTION);
}
if self.options.contains_key(SCAN_SNAPSHOT_ID_OPTION) {
selectors.push(SCAN_SNAPSHOT_ID_OPTION);
}
if self.options.contains_key(SCAN_TIMESTAMP_MILLIS_OPTION) {
selectors.push(SCAN_TIMESTAMP_MILLIS_OPTION);
}
selectors
}

/// Validates and normalizes the internal time-travel selector.
///
/// This is the semantic owner for selector mutual exclusion and strict
/// numeric parsing.
pub(crate) fn try_time_travel_selector(&self) -> crate::Result<Option<TimeTravelSelector<'a>>> {
let selectors = self.configured_time_travel_selectors();
if selectors.len() > 1 {
return Err(crate::Error::DataInvalid {
message: format!(
"Only one time-travel selector may be set, found: {}",
selectors.join(", ")
),
source: None,
});
}

if let Some(tag_name) = self.scan_tag_name() {
Ok(Some(TimeTravelSelector::TagName(tag_name)))
} else if let Some(id) = self.parse_i64_option(SCAN_SNAPSHOT_ID_OPTION)? {
Ok(Some(TimeTravelSelector::SnapshotId(id)))
} else if let Some(ts) = self.parse_i64_option(SCAN_TIMESTAMP_MILLIS_OPTION)? {
Ok(Some(TimeTravelSelector::TimestampMillis(ts)))
} else {
Ok(None)
}
}

/// Explicit bucket key columns. If not set, defaults to primary keys for PK tables.
pub fn bucket_key(&self) -> Option<Vec<String>> {
self.options
Expand Down Expand Up @@ -230,4 +302,86 @@ mod tests {
assert_eq!(core.partition_default_name(), "NULL_PART");
assert!(!core.legacy_partition_name());
}

#[test]
fn test_try_time_travel_selector_rejects_conflicting_selectors() {
let options = HashMap::from([
(SCAN_TAG_NAME_OPTION.to_string(), "tag1".to_string()),
(SCAN_SNAPSHOT_ID_OPTION.to_string(), "7".to_string()),
]);
let core = CoreOptions::new(&options);

let err = core
.try_time_travel_selector()
.expect_err("conflicting selectors should fail");
match err {
crate::Error::DataInvalid { message, .. } => {
assert!(message.contains("Only one time-travel selector may be set"));
assert!(message.contains(SCAN_TAG_NAME_OPTION));
assert!(message.contains(SCAN_SNAPSHOT_ID_OPTION));
}
other => panic!("unexpected error: {other:?}"),
}
}

#[test]
fn test_try_time_travel_selector_rejects_invalid_numeric_values() {
let snapshot_options =
HashMap::from([(SCAN_SNAPSHOT_ID_OPTION.to_string(), "abc".to_string())]);
let snapshot_core = CoreOptions::new(&snapshot_options);

let snapshot_err = snapshot_core
.try_time_travel_selector()
.expect_err("invalid snapshot id should fail");
match snapshot_err {
crate::Error::DataInvalid { message, .. } => {
assert!(message.contains(SCAN_SNAPSHOT_ID_OPTION));
}
other => panic!("unexpected error: {other:?}"),
}

let timestamp_options =
HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), "xyz".to_string())]);
let timestamp_core = CoreOptions::new(&timestamp_options);

let timestamp_err = timestamp_core
.try_time_travel_selector()
.expect_err("invalid timestamp millis should fail");
match timestamp_err {
crate::Error::DataInvalid { message, .. } => {
assert!(message.contains(SCAN_TIMESTAMP_MILLIS_OPTION));
}
other => panic!("unexpected error: {other:?}"),
}
}

#[test]
fn test_try_time_travel_selector_normalizes_valid_selector() {
let tag_options = HashMap::from([(SCAN_TAG_NAME_OPTION.to_string(), "tag1".to_string())]);
let tag_core = CoreOptions::new(&tag_options);
assert_eq!(
tag_core.try_time_travel_selector().expect("tag selector"),
Some(TimeTravelSelector::TagName("tag1"))
);

let snapshot_options =
HashMap::from([(SCAN_SNAPSHOT_ID_OPTION.to_string(), "7".to_string())]);
let snapshot_core = CoreOptions::new(&snapshot_options);
assert_eq!(
snapshot_core
.try_time_travel_selector()
.expect("snapshot selector"),
Some(TimeTravelSelector::SnapshotId(7))
);

let timestamp_options =
HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), "1234".to_string())]);
let timestamp_core = CoreOptions::new(&timestamp_options);
assert_eq!(
timestamp_core
.try_time_travel_selector()
.expect("timestamp selector"),
Some(TimeTravelSelector::TimestampMillis(1234))
);
}
}
1 change: 1 addition & 0 deletions crates/paimon/src/spec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod data_file;
pub use data_file::*;

mod core_options;
pub(crate) use core_options::TimeTravelSelector;
pub use core_options::*;

mod schema;
Expand Down
Loading
Loading